首页 文章

RxJava:observeOn,subscribeOn和doFinally,在IO和UI线程之间切换

提问于
浏览
4

我遇到了一个问题,我的observable在IO线程上订阅并在android主(UI)线程上观察但是 doFinally 运算符在IO线程上运行,它需要在UI线程上运行 .

用例与medium article几乎完全相同 .

我基本上想要在Observable订阅时显示 ProgressBar 并在Observable终止或结束时隐藏 ProgressBar .

我得到的错误是: java.lang.IllegalStateException: The current thread must have a looper!

任何人都可以帮助我将 doFinally 动作移回具有looper的UI线程吗?或者我错过了其他一些信息?

EDIT 用例工作流程是:

  • 启动活动

  • 初始化

  • 执行可观察流

  • 启动新活动并完成当前活动

  • 新活动

  • 开始原始活动并完成

  • 重复初始化

非常感谢你 .

细节:

  • RxJava 2.0.7

  • RxAndroid 2.0.1

  • Android sdk分14和目标25

示例代码

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

堆栈跟踪:

致命异常:RxCachedThreadScheduler-1进程:com.example.android.demo.customerfirst.alpha,PID:16685 java.lang.IllegalStateException:当前线程必须有一个looper!在android.view.Choreographer $ 1.initialValue(Choreographer.java:96)的android.view.Choreographer $ 1.initialValue(Choreographer.java:91)at java.lang.ThreadLocal $ Values.getAfterMiss(ThreadLocal.java:430)at at java.lang.ThreadLocal.get(ThreadLocal.java:65)在android.view.Choreographer.getInstance(Choreographer.java:192)的android.animation.ValueAnimator $ AnimationHandler . (ValueAnimator.java:600)在android.animation . 在Android.graphics.drawable.AnimatedVectorDrawable的android.animation.ValueAnimator.end(ValueAnimator.java:998)的android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)上的ValueAnimator $ AnimationHandler . (ValueAnimator.java:575) . 停止(AnimatedVectorDrawable.java:439)android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)android.view.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)android.view.View.dispatchVisibilityChanged(View.java) :8643)在android.view.View.setFlags(View.java:9686)的android.view.View.setVisibility(Vi) ew.java:6663)位于com.example的com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)的android.widget.ProgressBar.setVisibility(ProgressBar.java:1563) .android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator $ 3.run(ProductListPresenterMediator.java:56)位于io.reactivex的io.reactivex.internal.operators.observable.ObservableDoFinally $ DoFinallyObserver.runFinally(ObservableDoFinally.java:144) . internal.operators.observable.ObservableDoFinally $ DoFinallyObserver.onComplete(ObservableDoFinally.java:94)at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)at io.reactivex.internal.observers.DeferredScalarDisposable.complete( DeferScalarDisposable.java:84)io.reactivex.internalx.internal.ope上的io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)at io.reactivex.Observable.subscribe(Observable.java:10700)在io.reactivex.Observable.subscribe(Observable.java:10700)的io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)中的rators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33) at io.reactivex.Observable.subscribe(Observable.java:10700)at io.reactivex.internal.operators.observable.ObservableSubscribeOn $ 1.run(ObservableSubscribeOn.java:39)at io.reactivex.Scheduler $ 1.run(Scheduler.java) :138)ato.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)at java.util.concurrent.FutureTask . run(FutureTask.java:237)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:152)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)at java . util.concurrent.ThreadPoolExecutor.runWorker(TH readPoolExecutor.java:1112)java.lang.Thread.run上的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:587)(Thread.java:818)

2 回答

  • 2

    您需要做的就是将 observeOn 向上移动 . observeOn 方法更改了调用 onNextonErroronCompleted 的线程,其内部操作和副作用如何工作(通过提升)

    listUseCase.execute(null)
                .subscribeOn(schedulerProvider.io()) // Move subscribe on here
                .observeOn(schedulerProvider.main()) // Change threads here
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        getView().showLoading(true); // This should be on the main thread also
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        getView().showLoading(false);
                    }
                })
    
                .subscribe(
                        new Consumer<List<AccountEntity>>() {
                            @Override
                            public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                                getView().setAccounts(accountEntities);
                            }
                        },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(@NonNull Throwable throwable) throws Exception {
                                if (isViewAttached()) {
                                    getView().showError(throwable.getMessage());
                                }
                            }
                        }
                );
    
  • 2

    问题出现了,因为我没有在活动完成/销毁时处理订阅 .

    现在,每个活动/视图告诉演示者何时停止或销毁它们并且演示者处理订阅 .

    这似乎解决了我的问题 .

    @Override
    public void initialize() {
        if (!isViewAttached()) {
            throw new ViewNotAttachedException();
        }
        disposable = listUseCase.execute(null)
                .subscribeOn(schedulerProvider.io()) // Move subscribe on here
                .observeOn(schedulerProvider.main()) // Change threads here
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        getView().showLoading(true); // This should be on the main thread also
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        getView().showLoading(false);
                    }
                })
                .subscribe(
                        new Consumer<List<AccountEntity>>() {
                            @Override
                            public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                                getView().setAccounts(accountEntities);
                            }
                        },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(@NonNull Throwable throwable) throws Exception {
                                if (isViewAttached()) {
                                    getView().showError(throwable.getMessage());
                                }
                            }
                        }
                );
    }
    
    @Override
    public void dispose() {
        if (disposable != null) {
            disposable.dispose();
        }
    }
    

相关问题