首页 文章

Rxswift Map 并行连接

提问于
浏览
2

此Observable正在执行以下操作

  • 给定源可观察

  • 我们使用map来执行一些异步工作

  • 我们使用concat按顺序返回异步工作的结果

以下是返回所需的结果,但我想并行启动异步工作 .

用Rx做这件事的正确方法是什么?

import RxSwift

func delay(time: Int, closure: () -> Void) {
  dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
    dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
  return Observable.create() { (observer) -> Disposable in
    print(desc)
    delay(time) {
      observer.onNext(value)
      observer.onCompleted()
    }
    return NopDisposable.instance
  }
}

let seq = Observable
  .of(1, 2, 3, 4, 5)
  .map { (n) -> Observable<Int> in
    return doAsyncWork(n,
      desc: "start \(n) - wait \(5 - n)",
      time: 6 - n
    )
  }
  .concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }

这产生了

//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5

期望的输出将是

//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5

4 回答

  • 1

    这似乎不确定这是最好的答案

    import RxSwift
    
    func delay(time: Int, closure: () -> Void) {
      dispatch_after(
        dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
        dispatch_get_main_queue(), closure)
    }
    
    func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
      return Observable.create() { (observer) -> Disposable in
        print(desc)
        delay(time) {
          observer.onNext(value)
          observer.onCompleted()
        }
        return NopDisposable.instance
      }
    }
    
    let seq = Observable
      .of(1, 2, 3, 4, 5)
      .map { (n) -> Observable<Int> in
        let o = doAsyncWork(n,
          desc: "start \(n) - wait \(5 - n)",
          time: 6 - n
        ).shareReplay(1)
        o.subscribe()
        return o.asObservable()
      }
      .concat()
    
    let sharedSeq = seq.shareReplay(0)
    sharedSeq.subscribeNext { print("=> \($0)") }
    sharedSeq.subscribeCompleted { print("=> completed") }
    
  • 0

    你的"desired output"似乎不同意你想让 Observable 开始"in parallel",但是延迟他们的元素,使"5"没有延迟,"4"有1秒延迟,"3"有2秒延迟,等等 .

    我认为你正在寻找这个输出:

    start 1 - wait 4
    start 2 - wait 3
    start 3 - wait 2
    start 4 - wait 1
    start 5 - wait 0
    5
    4
    3
    2
    1
    

    这可以用来做到这一点:

    Observable.range(start: 1, count: 5)
        .flatMap { n -> Observable<Int> in
            let waitInterval = 5 - n
            print("start \(n) - wait \(waitInterval)")
            return Observable.just(n)
                .delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance)
        }
        .subscribeNext { i in
            print(i)
        }
        .addDisposableTo(disposeBag)
    

    如果您有其他意思,您可以轻松调整此片段以实现目标 .

  • 0

    这对你现在没有帮助,但也许它将来会帮助别人 .

    您正在寻找的运营商称为 concatMap . 但是,目前,它并不存在于 RxSwift 中 .

    目前存在一个封闭的PR here .

  • 2

    之所以不能按预期工作,是因为 concat 一次订阅一个源可观察对象,等待第一个完成,然后才能订阅第二个,依此类推 .

    在RxJava中有 concatEager ,它可以满足您的需要 - 在开始时订阅所有源,同时仍然保留顺序 . 但似乎没有在Swift中 .

    你可以做的是用索引,flatMap,索引和解压缩对每个项目进行压缩 .

相关问题