首页 文章

RxJava subscribeOn和observeOn不会覆盖之前设置的原始Scheduler?

提问于
浏览
4

我在Android中使用了RxJava和Retrofit 2,我在subscribe()之前调用了subscribeOn(Schedulers.io())android observeOn(AndroidSchedulers.mainThread())全局 . 但是,有时我想调用subscribeOn(Schedulers.immediate())android observeOn(Schedulers.immediate())来覆盖Scheduler之前设置的同步进程 . 但我发现它不起作用,android工作仍然会在io()线程上处理,android结果由mainThread()处理 . 为什么?

1 回答

  • 6

    这就是RxJava的工作方式 .

    看看this video tutorial,从12:50开始 . 因此,视频中的示例:

    Observable.just(1, 2, 3)
        .subscribeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .subscribe(System.out::println);
    

    会发生什么事情 subscribeOn() 嵌套所有电话 . 在这种情况下,首先生成 subscribeOn(Schedulers.io()) 并在io线程上订阅它上面的所有内容 . 但是接下来会生成 subscribeOn(Schedulers.newThread()) 并且它会优先(因为它被称为最后一个)来订阅它上面的所有内容 . 没有 Build 一个线程链 . 在这个例子中,你基本上没有任何理由产生io线程 .

    为了更好地处理 subscribeOn()observeOn() 方法,我建议你看一下同一位视频作者的this post . 他建议的是使用 Transformer 来包含对这些方法的调用:

    Transformer实际上只是Func1 <Observable <T>,Observable <R >> . 换句话说:为它提供一种类型的Observable,它将返回另一种Observable . 这与内联调用一系列运算符完全相同 .

    这样,您可以使用如下方法:

    <T> Transformer<T, T> applySchedulers() {  
        return observable -> observable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
    }
    

    或者,如果要重用变换器,可以进行以下设置:

    final Transformer schedulersTransformer =  
        observable -> observable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
    
    @SuppressWarnings("unchecked")
    <T> Transformer<T, T> applySchedulers() {  
        return (Transformer<T, T>) schedulersTransformer;
    }
    

    然后上面的例子看起来像:

    Observable.just(1, 2, 3)
        .compose(applySchedulers())
        .subscribe(System.out::println);
    

    希望有所帮助 .

相关问题