首页 文章

将许多未来[Seq]连接成一个未来[Seq]

提问于
浏览
2

如果没有Future,那就是我将所有较小的Seq组合成一个大的Seq与 flatmap

category.getCategoryUrlKey(id: Int):Seq[Meta] // main method
val appDomains: Seq[Int]

val categories:Seq[Meta] = appDomains.flatMap(category.getCategoryUrlKey(_))

现在方法 getCategoryUrlKey 可能会失败 . 我在前面放了一个断路器,以避免在 maxFailures 之后为下一个元件调用它 . 现在断路器没有返回 Seq 但是 Future[Seq]

lazy val breaker = new akka.pattern.CircuitBreaker(...)

private def getMeta(appDomainId: Int): Future[Seq[Meta]] = {
  breaker.withCircuitBreaker {
    category.getCategoryUrlKey(appDomainId)
  }
}

如何遍历List appDomains 并将结果合并到一个Future [Seq]中,可能进入Seq?

如果函数式编程适用,有没有办法在没有临时变量的情况下直接转换?

3 回答

  • 0

    使用Future.sequence压制期货的seq

    Future.sequenceSeq[Future[T]] 转换为 Future[Seq[T]]

    在你的情况下 TSeq . 在序列操作之后,您将得到Seq [Seq [T]] . 因此,在使用展平的顺序操作之后,将其展平 .

    def squashFutures[T](list: Seq[Future[Seq[T]]]): Future[Seq[T]] =
      Future.sequence(list).map(_.flatten)
    

    你的代码变成了

    Future.sequence(appDomains.map(getMeta)).map(_.flatten)
    
  • 1

    从TraversableOnce [Future [A]]到Future [TraversableOnce [A]]

    val categories = Future.successful(appDomains).flatMap(seq => {
        val fs = seq.map(i => getMeta(i))
        val sequenced = Future.sequence(fs)
        sequenced.map(_.flatten)
    })
    
    • Future.successful(appDomains)appDomains 提升到 Future 的上下文中

    希望这可以帮助 .

  • 4
    val metaSeqFutureSeq = appDomains.map(i => getMeta(i))
    // Seq[Future[Seq[Meta]]]
    
    val metaSeqSeqFuture = Future.sequence(metaSeqFutureSeq)
    // Future[Seq[Seq[Meta]]]
    // NOTE :: this future will fail if any of the futures in the sequence fails
    
    val metaSeqFuture = metaSeqSeqFuture.map(seq => seq.flatten)
    // Future[Seq[Meta]]
    

    如果你想拒绝唯一失败的未来但保留成功的未来,那么我们必须有点创造力并使用承诺 Build 我们的未来 .

    import java.util.concurrent.locks.ReentrantLock
    
    import scala.collection.mutable.ArrayBuffer
    import scala.concurrent.{Future, Promise}
    import scala.util.{Failure, Success}
    
    def futureSeqToOptionSeqFuture[T](futureSeq: Seq[Future[T]]): Future[Seq[Option[T]]] = {
      val promise = Promise[Seq[Option[T]]]()
    
      var remaining = futureSeq.length
    
      val result = ArrayBuffer[Option[T]]()
      result ++ futureSeq.map(_ => None)
    
      val resultLock = new ReentrantLock()
    
      def handleFutureResult(option: Option[T], index: Int): Unit = {
        resultLock.lock()
        result(index) = option
        remaining = remaining - 1
        if (remaining == 0) {
          promise.success(result)
        }
        resultLock.unlock()
      }
    
      futureSeq.zipWithIndex.foreach({ case (future, index) => future.onComplete({
        case Success(t) => handleFutureResult(Some(t), index)
        case Failure(ex) => handleFutureResult(None, index)
      }) })
    
      promise.future
    }
    
    val metaSeqFutureSeq = appDomains.map(i => getMeta(i))
    // Seq[Future[Seq[Meta]]]
    
    val metaSeqOptionSeqFuture = futureSeqToOptionSeqFuture(metaSeqFutureSeq)
    // Future[Seq[Option[Seq[Meta]]]]
    
    val metaSeqFuture = metaSeqSeqFuture.map(seq => seq.flatten.flatten)
    // Future[Seq[Meta]]
    

相关问题