首页 文章

RxJava - 当使用observeOn和subscribeOn时,ConnectableObservable无法通知其观察者超过128次

提问于
浏览
0

我有一个使用ConnectableObservable的应用程序,该应用程序运行很长时间 . 一段时间后,它的观察者停止在onNext()方法中停止收到通知 .

我编写了以下测试,简化了示例 . 它只是一个具有无限循环的ConnectableObservable,其中一个订阅者使用observeOn和subscribeon . 在128 s.onNext(1) 次呼叫后,它停止通知观察者 .

@Test
public void testHotObservable() throws InterruptedException{

    CountDownLatch latch = new CountDownLatch(1);

    ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
        while(true){
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(1);
        }
    })
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .publish();

    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onNext(Integer i) {
            System.out.println("got "+i);
        }
        @Override
        public void onCompleted() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    observable.subscribe(observer);
    observable.connect();

    latch.await();
}

这就是我所看到的调试RxJava的代码,我发现它没有调用Observer的onNext()方法,但我不理解它:

1.- s.onNext(1); 被称为:

2.-执行到 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue()

void pollQueue() {
    int emitted = 0;
    final AtomicLong localRequested = this.requested;
    final AtomicLong localCounter = this.counter;
    do {
        localCounter.set(1);
        long produced = 0;
        long r = localRequested.get();            
        for (;;) {
            ...
            System.out.println("R: "+r);
            if (r > 0) {
                Object o = queue.poll();
                if (o != null) {
                    child.onNext(on.getValue(o));
                    r--;

The problem is the value of r . 它第一次执行它的值总是128.每次调用后它减1( r-- ) . 这意味着 ConnectableObservable can only notify its observers 128 times when using both observeOn and subscribeOn . 如果我删除subscribeOn,r的值将在每次迭代时开始,并且它可以正常工作 .

UPDATE

我找到了一个解决方案:问题是由 .observerOn().subscribeOn() 的顺序引起的 . 如果我将它反转为 .subscribeOn().observeOn() 它可以工作(我可以看到 r 的值总是重置为128) .

无论如何,我很感激解释 .

1 回答

  • 1

    许多异步运算符使用内部固定大小的缓冲区,并依赖于请求后续的订阅者 . 在你的情况下,某些东西没有正确地请求,我不能说它是什么 . 我建议您尝试使用标准组件的用例来查看可能出现的错误,即您可以使用PublishSubject示例替换自定义Observable:

    Subject<Integer, Integer> source = PublishSubject.<Integer>create().toSerialized();
    
    ConnectableObservable<Integer> co = source.sample(
        500, TimeUnit.MILLISECONDS, Schedulers.io())
    .onBackpressureBuffer().publish();
    
    co.subscribe(yourSubscriber);
    co.connect();
    
    source.onNext(1);
    

相关问题