首页 文章

RxJava onErrorResumeNext()

提问于
浏览
8

我有两个可观察对象(简称为A和B)和一个用户 . 因此,订阅者订阅A,如果A上有错误,那么B(这是后备)就会启动 . 现在,每当A遇到错误B被称为罚款,但是A调用订阅者的onComplete(),所以B响应即使B执行成功,也永远不会到达订户 .

这是正常行为吗?我认为onErrorResumeNext()应该继续流并在完成后通知订阅者,如文档(https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext)中所述 .

这是我正在做的整体结构(省略了几个“无聊的”代码):

public Observable<ModelA> observeGetAPI(){
    return retrofitAPI.getObservableAPI1()
            .flatMap(observableApi1Response -> {
                ModelA model = new ModelA();

                model.setApi1Response(observableApi1Response);

                return retrofitAPI.getObservableAPI2()
                        .map(observableApi2Response -> {
                            // Blah blah blah...
                            return model;
                        })
                        .onErrorResumeNext(observeGetAPIFallback(model))
                        .subscribeOn(Schedulers.newThread())
            })
            .onErrorReturn(throwable -> {
                // Blah blah blah...
                return model;
            })
            .subscribeOn(Schedulers.newThread());
}

private Observable<ModelA> observeGetAPIFallback(ModelA model){
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
        // Blah blah blah...
        return model;
    }).onErrorReturn(throwable -> {
        // Blah blah blah...
        return model;
    })
    .subscribeOn(Schedulers.immediate());
}

Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
    // WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);

我有什么想法我做错了吗?

谢谢!

EDIT: 这里's a rough timeline of what'发生了:

---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)

---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)

---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)

在这里's a link to a simple diagram I made which represent'我想要发生的事情:Diagram

1 回答

  • 6

    以下使用的Rx调用应模拟您使用Retrofit进行的操作 .

    fallbackObservable =
            Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            logger.v("emitting A Fallback");
                            subscriber.onNext("A Fallback");
                            subscriber.onCompleted();
                        }
                    })
                    .delay(1, TimeUnit.SECONDS)
                    .onErrorReturn(new Func1<Throwable, String>() {
                        @Override
                        public String call(Throwable throwable) {
                            logger.v("emitting Fallback Error");
                            return "Fallback Error";
                        }
                    })
                    .subscribeOn(Schedulers.immediate());
    
    stringObservable =
            Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            logger.v("emitting B");
                            subscriber.onNext("B");
                            subscriber.onCompleted();
                        }
                    })
                    .delay(1, TimeUnit.SECONDS)
                    .flatMap(new Func1<String, Observable<String>>() {
                        @Override
                        public Observable<String> call(String s) {
                            logger.v("flatMapping B");
                            return Observable
                                    .create(new Observable.OnSubscribe<String>() {
                                        @Override
                                        public void call(Subscriber<? super String> subscriber) {
                                            logger.v("emitting A");
                                            subscriber.onNext("A");
                                            subscriber.onCompleted();
                                        }
                                    })
                                    .delay(1, TimeUnit.SECONDS)
                                    .map(new Func1<String, String>() {
                                        @Override
                                        public String call(String s) {
                                            logger.v("A completes but contains invalid data - throwing error");
                                            throw new NotImplementedException("YUCK!");
                                        }
                                    })
                                    .onErrorResumeNext(fallbackObservable)
                                    .subscribeOn(Schedulers.newThread());
                        }
                    })
                    .onErrorReturn(new Func1<Throwable, String>() {
                        @Override
                        public String call(Throwable throwable) {
                            logger.v("emitting Return Error");
                            return "Return Error";
                        }
                    })
                    .subscribeOn(Schedulers.newThread());
    
    subscription = stringObservable.subscribe(
            new Action1<String>() {
                @Override
                public void call(String s) {
                    logger.v("onNext " + s);
                }
            },
            new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    logger.v("onError");
                }
            },
            new Action0() {
                @Override
                public void call() {
                    logger.v("onCompleted");
                }
            });
    

    日志语句的输出是:

    RxNewThreadScheduler-1 emitting B
    RxComputationThreadPool-1 flatMapping B
    RxNewThreadScheduler-2 emitting A
    RxComputationThreadPool-2 A completes but contains invalid data - throwing error
    RxComputationThreadPool-2 emitting A Fallback
    RxComputationThreadPool-1 onNext A Fallback
    RxComputationThreadPool-1 onCompleted
    

    这看起来像你在寻找,但也许我错过了一些东西 .

相关问题