首页 文章

在RxJava中使用“skipWhile”和“repeatWhen”来实现服务器轮询

提问于
浏览
7

我真的很喜欢RxJava,它是一个很棒的工具,但有些时候很难理解它是如何工作的 . 我们在Android项目中使用Retrofit和RxJava,并且有以下用例:

我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作 . 服务器完成后,我必须提供结果 . 所以我用RxJava成功完成了它,这里是代码片段:我用“skipWhile”和“repeatWhen”

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
        .skipWhile(new Func1<CheckJobResponse, Boolean>() {

            @Override
            public Boolean call(CheckJobResponse checkJobResponse) {
                boolean shouldSkip = false;

                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());

                switch (checkJobResponse.getJobStatus()){
                    case CheckJobResponse.PROCESSING:
                        shouldSkip = true;
                        break;
                    case CheckJobResponse.DONE:
                    case CheckJobResponse.ERROR:
                        shouldSkip = false;
                        break;
                }
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);

                return shouldSkip;
            }
        })
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
                return observable.delay(1, TimeUnit.SECONDS);
            }
        }).subscribe(new Subscriber<CheckJobResponse>(){
            @Override
            public void onNext(CheckJobResponse response) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);

            }

            @Override
            public void onError(BaseError error) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
                Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();

            }

            @Override
            public void onCompleted() {
                if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
            }
        });

代码工作正常:

当服务器响应该作业正在处理时,我从“skipWhile”链返回“true”,原始的Observable等待1秒并再次执行http请求 . 重复此过程,直到我从“skipWhile”链返回“false” .

这是我不明白的一些事情:

  • 我在“skipWhile”的文档中看到它不会从原始的Observable发出任何内容(onError,onNext,onComplete),直到我从“call”方法返回“false” . 所以如果它没有发出任何东西为什么“repeatWhen”Observable做它的工作?它等待一秒钟并再次运行请求 . 是谁推出的?

  • 第二个问题是:为什么“repeatWhen”中的Observable不能永远运行,我的意思是为什么当我从“skipWhile”返回“false”时它会停止重复?如果我返回“false”,我会在订阅者中成功获取onNext .

  • 在“repeatWhile”的文档中,它说我最终在订阅者中调用了“onComplete”,但从未调用过“onComplete” .

  • 如果我改变链接“skipWhile”和“repeatWhen”的顺序没有区别 . 这是为什么 ?

我知道RxJava是开源的,我只能阅读代码,但正如我所说 - 这真的很难理解 .

谢谢 .

1 回答

  • 14

    我之前没有和 repeatWhen 合作,但是这个问题让我好奇,所以我做了一些研究 .

    skipWhile 会发出 onErroronCompleted ,即使它在此之前永远不会返回 true . 因此,每次 checkJob() 发出 onCompleted 时都会调用 repeatWhen . 这回答了问题#1 .

    其余的问题都是基于错误的假设 . 您的订阅永远在运行,因为 repeatWhen 永远不会终止 . 那是因为 repeatWhen 是一个比你意识到的更复杂的野兽 . 只要从源获取 onCompletedObservable 就会发出 null . 如果你接受并返回 onCompleted 然后它结束,否则如果你发出任何东西它重试 . 由于 delay 只需要一个发射并延迟它,它总是再次发射 null . 因此,它不断重新订阅 .

    那么#2的答案就是它永远在运行;您可能正在执行此代码之外的其他操作来取消订阅 . 对于#3,你永远不会得到 onCompleted ,因为它永远不会完成 . 对于#4,订单不会无限期地重复 .

    现在的问题是,你如何得到正确的行为?它就像使用 takeUntil 而不是 skipWhile 一样简单 . 这样,你不断重复,直到得到你想要的结果,从而在你希望它结束时终止流 .

    这是一个代码示例:

    Observable<Boolean> source = ...; // Something that eventually emits true
    
    source
        .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
        .takeUntil(result -> result)
        .filter(result -> result)
        .subscribe(
            res -> System.out.println("onNext(" + res + ")"),
            err -> System.out.println("onError()"),
            () -> System.out.println("onCompleted()")
        );
    

    在这个例子中, source 正在发出布尔值 . 我每1秒重复一次,直到源发出 true . 我一直在服用,直到 resulttrue . 并且我过滤掉所有 false 的通知,因此订户不会't get them until it' s true .

相关问题