首页 文章

查询RxJava线程调度

提问于
浏览
0

我是RxJava2的新手 . 在下面的代码中,我无法理解订阅者如何在后台线程上工作,即使Observable / Flowable在主线程上发出并且没有指定Scheduler(使用subscribeOn(Schedulers . *)调用) . 完整的代码可以在this github repo.找到

@OnClick(R.id.btn_start_simple_polling)
public void onStartSimplePollingClicked() {

    _log("onStartSimplePollingClicked called on ");  //MAIN THREAD

    final int pollCount = POLL_COUNT;

    Disposable d = Observable
          .interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
          .map(this::_doNetworkCallAndGetStringResult)
          .take(pollCount)
          .doOnSubscribe(subscription -> {
              _log(String.format("Start simple polling - %s", _counter));     //MAIN THREAD
          })
          .subscribe(taskName -> {
              _log(String.format(Locale.US,
                                 "Executing polled task [%s] now time : [xx:%02d]",
                                 taskName,
                                 _getSecondHand()));
          });

    _disposables.add(d);
}

private String _doNetworkCallAndGetStringResult(long attempt) {
    try {
        _log("_doNetworkCallAndGetStringResult called on "); //BACKGROUND THREAD
        if (attempt == 4) {
            // randomly make one event super long so we test that the repeat logic waits
            // and accounts for this.
            Thread.sleep(9000);
        }
        else {
            Thread.sleep(3000);
        }

    } catch (InterruptedException e) {
        Timber.d("Operation was interrupted");
    }
    _counter++;

    return String.valueOf(_counter);
}

2 回答

  • 4

    由于您未指定要将RxJava默认订阅到同步订阅的调度程序 . 所以对 onSubscribedoOnSubscribe 的调用发生在主线程上 .

    但是, Observable.interval 运算符需要隐式或显式调度程序来广播 onNext 事件 . 由于您未指定调度程序,因此默认为 Schedulers.computation() . 在间隔触发后,它继续在同一计算线程上调用 _doNetworkCallAndGetStringResult ,从而在后台发生 .

  • 0

    默认情况下RxJava同步运行,但@Kiskae的一些运营商已经告诉你间隔,延迟或其他一些

    如果你想异步运行一个管道,你将不得不使用observerOn,这将使得管道在另一个线程中运行一旦放入你的管道

    /**
         * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
         * Shall print
         * First step main
         * Second step RxNewThreadScheduler-2
         * Third step RxNewThreadScheduler-1
         */
        @Test
        public void testObservableObserverOn() throws InterruptedException {
            Subscription subscription = Observable.just(1)
                    .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                            .getName()))
                    .observeOn(Schedulers.newThread())
                    .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                            .getName()))
                    .observeOn(Schedulers.newThread())
                    .doOnNext(number -> System.out.println("Third step " + Thread.currentThread()
                            .getName()))
                    .subscribe();
            new TestSubscriber((Observer) subscription)
                    .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
        }
    

    或者使用subscribeOn,它将使您的管道在您指定的线程中运行

    /**
     * Does not matter at what point in your pipeline you set your subscribeOn, once that is set in the pipeline,
     * all steps will be executed in another thread.
     * Shall print
     * First step RxNewThreadScheduler-1
     * Second step RxNewThreadScheduler-1
     */
    @Test
    public void testObservableSubscribeOn() throws InterruptedException {
        Subscription subscription = Observable.just(1)
                .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                        .getName()))
                .subscribeOn(Schedulers.newThread())
                .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                        .getName()))
                .subscribe();
        new TestSubscriber((Observer) subscription)
                .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
    }
    

    你可以在这里看到更多关于async rxJava的例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

相关问题