首页 文章

RxJava改造长轮询

提问于
浏览
20

我的问题是我无法通过 Retrofit 获得无限流 . 在我获得初始poll()请求的凭证后 - 我做了初始poll()请求 . 如果没有更改,则每个poll()请求在25秒内响应,如果有任何更改,则每个poll()请求更早 - 返回changed_data [] . 每个响应包含下一个轮询请求所需的 timestamp 数据 - 我应该在每个poll()响应之后执行新的poll()请求 . 这是我的代码:

getServerApi().getLongPollServer() 
  .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
   .take(1) 
   .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
   .retry()
   .subscribe(longPollEnvelope1 -> {
   processUpdates(longPollEnvelope1.getUpdates());
});

我是RxJava的新手,也许我不明白,但我无法获得无限的流 . 我接到3个电话,然后是onNext和onComplete .

附:也许有更好的解决方案在Android上实现长轮询?

1 回答

  • 11

    虽然不理想,但我相信你可以使用RX的副作用来达到预期的效果('doOn'操作) .

    Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation
    
    Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
        // side effect variable
        AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
        return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
                .flatMap(credentials -> api.query("request", credentials, timestamp.get()))  // this will use the value from previous doOnNext
                .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
                .repeat();
    })
            .retry()
            .share();
    
    private static class CredentialsWithTimestamp {
    
        public final String credentials;
        public final long timestamp; // I assume this is necessary for you from the first request
    
        public CredentialsWithTimestamp(String credentials, long timestamp) {
            this.credentials = credentials;
            this.timestamp = timestamp;
        }
    }
    

    订阅'o'时,内部observable将重复 . 如果出现错误,则“o”将重试并从凭证流重新请求 .

    在您的示例中,通过更新timestamp变量来实现计算转向,这是下一个请求所必需的 .

相关问题