首页 文章

在akka-stream中如何从期货集合中创建无序的来源

提问于
浏览
9

我需要从 Future[T] 的集合中创建 akka.stream.scaladsl.Source[T, Unit] .

例如,有一组期货返回整数,

val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)

如何创建一个

val source: Source[Int, Unit] = ???

从中 .

我不能使用 Future.sequence 组合器,从那时起我会等待每个未来完成之后从源头获取任何东西 . 我想在任何未来完成后立即以任何顺序获得结果 .

我知道 Source 是一个纯粹的功能API,它不应该以某种方式实现它之前运行任何东西 . 所以,我的想法是使用 Iterator (这是懒惰的)来创建一个源:

Source { () =>
  new Iterator[Future[Int]] {
    override def hasNext: Boolean = ???
    override def next(): Future[Int] = ???
  }
}

但这将是未来的来源,而不是实际 Value . 我也可以使用 Await.result(future) 阻止 next ,但我将阻止我的线程 . 这也将顺序调用期货,而我需要并行执行 .

UPDATE 2 :事实证明有一种更简单的方法(感谢Viktor Klang):

Source(futures).mapAsync(1)(identity)

UPDATE :这是基于@sschaef的答案:

def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
  def run(actor: ActorRef): Unit = {
    futures.foreach { future =>
      future.onComplete {
        case Success(value) =>
          actor ! value
        case Failure(NonFatal(t)) =>
          actor ! Status.Failure(t) // to signal error
      }
    }

    Future.sequence(futures).onSuccess { case _ =>
      actor ! Status.Success(()) // to signal stream's end
    }
  }

  Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}

// ScalaTest tests follow

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

"futuresToSource" should "convert futures collection to akka-stream source" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future(3)

  whenReady {
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
  } { results =>
    results should contain theSameElementsAs Seq(1, 2, 3)
  }
}

it should "fail on future failure" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future.failed(new RuntimeException("future failed"))

  whenReady {
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
  } { t =>
    t shouldBe a [RuntimeException]
    t should have message "future failed"
  }
}

2 回答

  • 6

    创建Futures的来源,然后通过mapAsync“展平”它:

    scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
    res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804
    
  • 5

    提供Source的最简单方法之一是通过Actor:

    import scala.concurrent.Future
    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    
    implicit val system = ActorSystem("MySystem")
    
    def run(actor: ActorRef): Unit = {
      import system.dispatcher
      Future { Thread.sleep(100); actor ! 1 }
      Future { Thread.sleep(200); actor ! 2 }
      Future { Thread.sleep(300); actor ! 3 }
    }
    
    val source = Source
      .actorRef[Int](0, OverflowStrategy.fail)
      .mapMaterializedValue(ref ⇒ run(ref))
    implicit val m = ActorMaterializer()
    
    source runForeach { int ⇒
      println(s"received: $int")
    }
    

    Actor是通过 Source.actorRef 方法创建的,并通过 mapMaterializedValue 方法提供 . run 只需获取Actor并将所有已完成的值发送给它,然后可以通过 source 访问它 . 在上面的示例中,值直接在Future中发送,但这当然可以在任何地方完成(例如,在对Future的 onComplete 调用中) .

相关问题