此Observable正在执行以下操作
-
给定源可观察
-
我们使用map来执行一些异步工作
-
我们使用concat按顺序返回异步工作的结果
以下是返回所需的结果,但我想并行启动异步工作 .
用Rx做这件事的正确方法是什么?
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
return doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
)
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }
这产生了
//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5
期望的输出将是
//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5
4 回答
这似乎不确定这是最好的答案
你的"desired output"似乎不同意你想让
Observable
开始"in parallel",但是延迟他们的元素,使"5"没有延迟,"4"有1秒延迟,"3"有2秒延迟,等等 .我认为你正在寻找这个输出:
这可以用来做到这一点:
如果您有其他意思,您可以轻松调整此片段以实现目标 .
这对你现在没有帮助,但也许它将来会帮助别人 .
您正在寻找的运营商称为
concatMap
. 但是,目前,它并不存在于RxSwift
中 .目前存在一个封闭的PR here .
之所以不能按预期工作,是因为
concat
一次订阅一个源可观察对象,等待第一个完成,然后才能订阅第二个,依此类推 .在RxJava中有
concatEager
,它可以满足您的需要 - 在开始时订阅所有源,同时仍然保留顺序 . 但似乎没有在Swift中 .你可以做的是用索引,flatMap,索引和解压缩对每个项目进行压缩 .