首页 文章

Schedulers.io()没有返回主线程

提问于
浏览
4

我正在使用RxParse来解析查询的异步加载但是当我使用subscribeOn(Schedulers.io())订阅我的observable时,我的onCompleted方法永远不会在主线程上调用 . 而不是这个,我的onCompleted方法在工作线程池内调用 . 如果我使用observeOn(AndroidSchedulers.mainThread),一切都会工作,但我的onNextMethod也将在主线程上调用,我不想要它 .

我的代码有问题吗?

我的代码有什么问题吗?

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                // ... worker thread (but here is ok)
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);

4 回答

  • 9

    不幸的是,订阅在所有方法的相同线程中( onNextonErroronCompleted

    但你可以在 Schedulers.io()onNext(T t) 方法内部观察,创建一个新的 Observable 来监听 MainThread ,如下所示:

    ParseObservable.find(myQuery)
        .map(myMapFunc())
        .subscribeOn(Schedulers.io())
        .subscribe(
            new Subscriber<MyObj>() {
               @Override
                public void onError(Throwable e) {
                    Log.e("error","error",e);
                }
    
                @Override
                public void onNext(T t) {
                    Observable.just(t)
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe((t) -> {
                             // do something in MainThread
                        })
                }
    
                public void onCompleted() {
                    // ... worker thread again instead of mainThread
                }
            }
        )
    );
    

    我希望它有所帮助!

  • 6

    首先,您需要了解 subscribeOn()observeOn() 之间的区别 . 这两个完全不同的运算符会影响Rx链的不同部分 .

    subscribeOn() 指定Observable将在哪里工作 . 它不会影响执行 onNext()onError()onComplete() 的位置 .

    observeOn() 指定执行回调(例如 onNext() )的位置 . 它不会影响Observable的工作位置 .

    所有回调都将在同一个线程上发生 . 您不能指定某个回调在一个线程上发生,而某些回调在另一个线程上发生,通过任何RxJava API . 如果这是您想要的行为,您将不得不在回调中自己实现它 .

  • 2

    在这种情况下,我建议使用“侧面操作”操作符 . 在我看来,比使用嵌套的observable更优雅的解决方案:

    ParseObservable.find(myQuery)
            .map(myMapFunc())
            .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
            .doOnCompleted(() -> onCompleteAction())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(value -> onNext(value))
            .subscribe();
    
  • 2

    不建议在订阅中订阅 .

    subscribeOn 确定观察者订阅它时Observable链的起始位置 .

    observeOn 可以在整个可观察链中的不同点(多次,如果需要)使用,以在线程之间传递控制 . (您可以通过检查是否在主线程上或在每个块内都没有来验证这一点) .

    ParseObservable.find(myQuery)
        .map(myMapFunc())
    
        // Added this:
        .doOnNext(obj -> {
            // NOTE: This will happen on your `subscribeOn` scheduler 
            // Do something with `obj` here while on worker thread
        }
    
        .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
    
        // Added this:
        .observeOn(AndroidSchedulers.mainThread())    
    
        .subscribe(new Subscriber<>() {
            next -> {
                // NOTE: This will happen on the main thread
            },
            error -> {
                Log.e("error","error",e);
                // NOTE: This will happen on the main thread
            },
            () -> {
                // NOTE: This will happen on the main thread
            }
        });
    

相关问题