我有一个使用ConnectableObservable的应用程序,该应用程序运行很长时间 . 一段时间后,它的观察者停止在onNext()方法中停止收到通知 .
我编写了以下测试,简化了示例 . 它只是一个具有无限循环的ConnectableObservable,其中一个订阅者使用observeOn和subscribeon . 在128 s.onNext(1)
次呼叫后,它停止通知观察者 .
@Test
public void testHotObservable() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
while(true){
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(1);
}
})
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.publish();
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onNext(Integer i) {
System.out.println("got "+i);
}
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
observable.subscribe(observer);
observable.connect();
latch.await();
}
这就是我所看到的调试RxJava的代码,我发现它没有调用Observer的onNext()方法,但我不理解它:
1.- s.onNext(1);
被称为:
2.-执行到 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue()
:
void pollQueue() {
int emitted = 0;
final AtomicLong localRequested = this.requested;
final AtomicLong localCounter = this.counter;
do {
localCounter.set(1);
long produced = 0;
long r = localRequested.get();
for (;;) {
...
System.out.println("R: "+r);
if (r > 0) {
Object o = queue.poll();
if (o != null) {
child.onNext(on.getValue(o));
r--;
The problem is the value of r . 它第一次执行它的值总是128.每次调用后它减1( r--
) . 这意味着 ConnectableObservable can only notify its observers 128 times when using both observeOn and subscribeOn . 如果我删除subscribeOn,r的值将在每次迭代时开始,并且它可以正常工作 .
UPDATE :
我找到了一个解决方案:问题是由 .observerOn().subscribeOn()
的顺序引起的 . 如果我将它反转为 .subscribeOn().observeOn()
它可以工作(我可以看到 r
的值总是重置为128) .
无论如何,我很感激解释 .
1 回答
许多异步运算符使用内部固定大小的缓冲区,并依赖于请求后续的订阅者 . 在你的情况下,某些东西没有正确地请求,我不能说它是什么 . 我建议您尝试使用标准组件的用例来查看可能出现的错误,即您可以使用PublishSubject示例替换自定义Observable: