下面有一个示例程序可以复制我的问题 .
问题:
-
将
flatMap
转换应用于某些Observable
-
订阅前面提到的
Observable
,将订阅存储在某个地方 -
在
Observable
自然终止之前处置上述订阅 -
在映射器函数返回的
Observable
中,引发Exception
-
Flatmap运算符不知道如何处理
Exception
,引发它,程序崩溃/退出
首选/预期行为:
- 错误应该传播到我的
onError
处理程序,而不是在调用RxJavaPlugins#onError
时使程序崩溃
罪魁祸首是下面的代码片段,在 ObservableFlatMap
中找到 . 问题是,一旦处理完父节点,对 addThrowable
的调用将返回false . 因此,错误永远不会传播到 onError
.
@Override
public void onError(Throwable t) {
if (parent.errors.addThrowable(t)) {
if (!parent.delayErrors) {
parent.disposeAll();
}
done = true;
parent.drain();
} else {
RxJavaPlugins.onError(t);
}
}
在这种情况下我该怎么办?我需要一个像 flatMap
一样的运算符,并将错误传播到我的 onError
处理程序而不是崩溃我的程序 .
这是我的Android应用程序的真实场景 . 当用户退出窗口/活动时,将自动处理订阅,并且由于 InterruptedIOException
s而在处置后可能会引发异常 .
用于复制问题的代码
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class Main {
public static void main(String[] args) throws InterruptedException {
RxJavaPlugins.setErrorHandler((throwable)->{
System.out.println("Please don't come through here");
throwable.printStackTrace();
});
Disposable disposable = Observable.just(1)
.subscribeOn(Schedulers.computation())
.flatMap((item)->{
return Observable.just(1)
.doOnNext((arg)->Thread.sleep(1000))
.doOnNext((arg)->{
throw new RuntimeException("Error");
});
})
.subscribe(System.out::println, (throwable)->{
System.out.println("Please come through here");
throwable.printStackTrace();
});
Thread.sleep(500);
disposable.dispose();
Thread.sleep(1000);
}
}
执行输出
Please don't come through here
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException: sleep interrupted
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:573)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:99)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at Main.lambda$null$1(Main.java:18)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
... 22 more
预期/首选输出
Please come through here
java.lang.RuntimeException: Error
at Main.lambda$null$2(Main.java:19)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
1 回答
问题出在
.subscribeOn(Schedulers.computation())
之前.subscribeOn(Schedulers.computation())
. 当你dispose
你的中断线程产生平面 Map 可观察量,打破了整个订阅 . 要解决此问题,您应该在flatMap
之后或之内指定订阅,或者在另一个线程中观察它 .工作范例: