首页 文章

使用Observable.flatMap时的错误处理

提问于
浏览
0

下面有一个示例程序可以复制我的问题 .

问题:

  • flatMap 转换应用于某些 Observable

  • 订阅前面提到的 Observable ,将订阅存储在某个地方

  • Observable 自然终止之前处置上述订阅

  • 在映射器函数返回的 Observable 中,引发 Exception

  • Flatmap运算符不知道如何处理 Exception ,引发它,程序崩溃/退出

首选/预期行为:

  • 错误应该传播到我的 onError 处理程序,而不是在调用 RxJavaPlugins#onError 时使程序崩溃

罪魁祸首是下面的代码片段,在 ObservableFlatMap 中找到 . 问题是,一旦处理完父节点,对 addThrowable 的调用将返回false . 因此,错误永远不会传播到 onError .

@Override
public void onError(Throwable t) {
  if (parent.errors.addThrowable(t)) {
      if (!parent.delayErrors) {
          parent.disposeAll();
      }
      done = true;
      parent.drain();
  } else {
      RxJavaPlugins.onError(t);
  }
}

在这种情况下我该怎么办?我需要一个像 flatMap 一样的运算符,并将错误传播到我的 onError 处理程序而不是崩溃我的程序 .

这是我的Android应用程序的真实场景 . 当用户退出窗口/活动时,将自动处理订阅,并且由于 InterruptedIOException s而在处置后可能会引发异常 .

用于复制问题的代码

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;

public class Main {

  public static void main(String[] args) throws InterruptedException {
    RxJavaPlugins.setErrorHandler((throwable)->{
      System.out.println("Please don't come through here");
      throwable.printStackTrace();
    });
    Disposable disposable = Observable.just(1)
        .subscribeOn(Schedulers.computation())
        .flatMap((item)->{
          return Observable.just(1)
              .doOnNext((arg)->Thread.sleep(1000))
              .doOnNext((arg)->{
                throw new RuntimeException("Error");
              });
        })
        .subscribe(System.out::println, (throwable)->{
          System.out.println("Please come through here");
          throwable.printStackTrace();
        });
    Thread.sleep(500);
    disposable.dispose();
    Thread.sleep(1000);
  }

}

执行输出

Please don't come through here
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException: sleep interrupted
  at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:573)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:99)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: sleep interrupted
  at java.lang.Thread.sleep(Native Method)
  at Main.lambda$null$1(Main.java:18)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
  ... 22 more

预期/首选输出

Please come through here
java.lang.RuntimeException: Error
  at Main.lambda$null$2(Main.java:19)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)

1 回答

  • 1

    问题出在 .subscribeOn(Schedulers.computation()) 之前 .subscribeOn(Schedulers.computation()) . 当你 dispose 你的中断线程产生平面 Map 可观察量,打破了整个订阅 . 要解决此问题,您应该在 flatMap 之后或之内指定订阅,或者在另一个线程中观察它 .

    工作范例:

    RxJavaPlugins.setErrorHandler((throwable) -> {
            System.out.println("Please don't come through here");
            throwable.printStackTrace();
        });
        Disposable disposable = Observable.just(1)
                .flatMap((item) -> {
                    return Observable.just(1)
    
                            .doOnNext((arg) -> Thread.sleep(1000))
                            .doOnNext((arg) -> {
                                throw new IllegalStateException("Error");
                            })
                            .subscribeOn(Schedulers.computation());
                })
                .subscribe(System.out::println,
                        (throwable) -> {
                            System.out.println("Please come through here");
                            throwable.printStackTrace();
                        });
        Thread.sleep(500);
        disposable.dispose();
        Thread.sleep(1000);
    

相关问题