我是RxJava2的新手 . 在下面的代码中,我无法理解订阅者如何在后台线程上工作,即使Observable / Flowable在主线程上发出并且没有指定Scheduler(使用subscribeOn(Schedulers . *)调用) . 完整的代码可以在this github repo.找到
@OnClick(R.id.btn_start_simple_polling)
public void onStartSimplePollingClicked() {
_log("onStartSimplePollingClicked called on "); //MAIN THREAD
final int pollCount = POLL_COUNT;
Disposable d = Observable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(this::_doNetworkCallAndGetStringResult)
.take(pollCount)
.doOnSubscribe(subscription -> {
_log(String.format("Start simple polling - %s", _counter)); //MAIN THREAD
})
.subscribe(taskName -> {
_log(String.format(Locale.US,
"Executing polled task [%s] now time : [xx:%02d]",
taskName,
_getSecondHand()));
});
_disposables.add(d);
}
private String _doNetworkCallAndGetStringResult(long attempt) {
try {
_log("_doNetworkCallAndGetStringResult called on "); //BACKGROUND THREAD
if (attempt == 4) {
// randomly make one event super long so we test that the repeat logic waits
// and accounts for this.
Thread.sleep(9000);
}
else {
Thread.sleep(3000);
}
} catch (InterruptedException e) {
Timber.d("Operation was interrupted");
}
_counter++;
return String.valueOf(_counter);
}
2 回答
由于您未指定要将RxJava默认订阅到同步订阅的调度程序 . 所以对
onSubscribe
和doOnSubscribe
的调用发生在主线程上 .但是,
Observable.interval
运算符需要隐式或显式调度程序来广播onNext
事件 . 由于您未指定调度程序,因此默认为Schedulers.computation()
. 在间隔触发后,它继续在同一计算线程上调用_doNetworkCallAndGetStringResult
,从而在后台发生 .默认情况下RxJava同步运行,但@Kiskae的一些运营商已经告诉你间隔,延迟或其他一些
如果你想异步运行一个管道,你将不得不使用observerOn,这将使得管道在另一个线程中运行一旦放入你的管道
或者使用subscribeOn,它将使您的管道在您指定的线程中运行
你可以在这里看到更多关于async rxJava的例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java