首页 文章

我可以有条件地“合并”一个单一的Observable吗?

提问于
浏览
0

我是一个RxJava新人,我在如何做以下事情时遇到了一些麻烦 .

  • 我正在使用Retrofit来调用一个网络请求,该请求返回 Single<Foo> ,这是我最终想要通过我的Subscriber实例消费的类型(称之为 SingleFooSubscriber

  • Foo 有一个内部属性 items ,类型为 List<String> .

  • 如果 Foo.items 不为空,我想为每个值调用单独的并发网络请求 . (这些请求的实际结果对于 SingleFooSubscriber 无关紧要,因为结果将在外部缓存) .
    只有在获取 Foo 和所有 Foo.items 时才应调用

  • SingleFooSubscriber.onComplete() .

fetchFooCall .subscribeOn(Schedulers.io())

// Approach #1...
// the idea here would be to "merge" the results of both streams into a single
// reactive type, but i'm not sure how this would work given that the item emissions
// could be far greater than one. using zip here i don't think it would every 
// complete.

.flatMap { foo ->
    if(foo.items.isNotEmpty()) {
        Observable.zip(
                Observable.fromIterable(foo.items),
                Observable.just(foo),
                { source1, source2 -> 
                    // hmmmm...
                }
        ).toSingle()

    } else {
        Single.just(foo)
    }
}

// ...or Approach #2...
// i think this would result in the streams for Foo and items being handled sequentially,
// which is not really ideal because
// 1) i think it would entail nested streams (i get the feeling i should be using flatMap 
//    instead)
// 2) and i'm not sure SingleFooSubscriber.onComplete() would depend on the completion of
//    the stream for items

.doOnSuccess { data ->
    if(data.items.isNotEmpty()) {
        // hmmmm...
    }
}

.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    { data  -> /* onSuccess() */ },
    { error -> /* onError()   */ }
)

任何关于如何处理这个问题的想法将不胜感激!

奖励积分:在尝试提出解决方案时,我开始质疑使用 Single 反应类型与 Observable 反应类型的决定 . 我的数据流中的大多数(除了这一个 Foo.items case?)实际上都围绕着消费单个实例,所以我倾向于单一来表示我的流,因为我认为它会在代码周围添加一些语义清晰度 . 任何人都有关于何时使用一个与另一个的一般指导?

1 回答

  • 2

    您需要嵌套 flatMap 然后转换回 Single

    retrofit.getMainObject()
      .flatMap(v ->
         Flowable.fromIterable(v.items)
           .flatMap(w ->
             retrofit.getItem(w.id).doOnNext(x -> w.property = x)
           )
           .ignoreElements()
           .toSingle(v)
    )
    

相关问题