def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
val p = Promise[M[A]]()
// the first Future to fail completes the promise
in.foreach(_.onFailure{case i => p.tryFailure(i)})
// if the whole sequence succeeds (i.e. no failures)
// then the promise is completed with the aggregated success
Future.sequence(in).foreach(p trySuccess _)
p.future
}
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order
和:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
7
这是一个不使用actor的解决方案 .
import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger
// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
val remaining = new AtomicInteger(fs.length)
val p = promise[T]
fs foreach {
_ onComplete {
case s @ Success(_) => {
if (remaining.decrementAndGet() == 0) {
// Arbitrarily return the final success
p tryComplete s
}
}
case f @ Failure(_) => {
p tryComplete f
}
}
}
p.future
}
class ResultCombiner(futs: Future[_]*) extends Actor {
var origSender: ActorRef = null
var futsRemaining: Set[Future[_]] = futs.toSet
override def receive = {
case () =>
origSender = sender
for(f <- futs)
f.onComplete(result => self ! if(result.isSuccess) f else false)
case false =>
origSender ! SomethingFailed
case f: Future[_] =>
futsRemaining -= f
if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
}
}
sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result
然后,创建actor,向其发送消息(以便它知道将其回复发送到何处)并等待回复 .
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
val f4: Future[Result] = actor ? ()
implicit val timeout = new Timeout(30 seconds) // or whatever
Await.result(f4, timeout.duration).asInstanceOf[Result] match {
case SomethingFailed => println("Oh noes!")
case EverythingSucceeded => println("It all worked!")
}
} finally {
// Avoid memory leaks: destroy the actor
actor ! PoisonPill
}
5
这个问题已得到解答,但我发布了我的 Value 类解决方案( Value 类在2.10中添加),因为这里没有一个 . 请随意批评 .
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
def concurrently = ConcurrentFuture(self)
}
case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
}
def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
val p = Promise[B]()
val inner = f(outer.future)
inner.future onFailure { case t => p.tryFailure(t) }
outer.future onFailure { case t => p.tryFailure(t) }
inner.future onSuccess { case b => p.trySuccess(b) }
ConcurrentFuture(p.future)
}
val l = List(1, 6, 8)
val f = l.map{
i => future {
println("future " +i)
Thread.sleep(i* 1000)
if (i == 12)
throw new Exception("6 is not legal.")
i
}
}
val f1 = Future.sequence(f)
f1 onSuccess{
case l => {
logInfo("onSuccess")
l.foreach(i => {
logInfo("h : " + i)
})
}
}
f1 onFailure{
case l => {
logInfo("onFailure")
}
8 回答
您可以使用for-comprehension,如下所示:
在这个例子中,期货1,2和3并行启动 . 然后,在理解中,我们等到结果1然后是2然后3可用 . 如果1或2失败,我们将不再等待3 . 如果所有3都成功,那么
aggFut
val将保持一个带有3个时隙的元组,对应于3个期货的结果 .现在如果你需要停止等待的行为,如果说fut2首先失败,事情会变得有点棘手 . 在上面的例子中,你必须等到fut1完成才能实现fut2失败 . 要解决这个问题,你可以尝试这样的事情:
现在这可以正常工作,但问题来自于知道哪一个
Future
在成功完成后从Map
中删除 . 只要你有一些方法可以将结果与产生该结果的Future进行正确关联,那么这样的方法就可以了 . 它只是递归地从 Map 中删除已完成的Futures,然后在剩余的Futures
上调用Future.firstCompletedOf
,直到没有剩下的为止,收集结果 . 它不漂亮,但如果你真的需要你正在谈论的行为,那么这个,或类似的东西可以工作 .您可以使用承诺,并向其发送第一次失败或最终完成的聚合成功:
然后你可以
Await
在那个结果Future
如果你想阻止,或者只是map
它变成别的东西 .与理解的区别在于,在这里你得到第一个失败的错误,而对于理解,你得到输入集合的遍历顺序中的第一个错误(即使另一个先失败) . 例如:
和:
这是一个不使用actor的解决方案 .
你可以单独使用期货来做到这一点 . 这是一个实现 . 请注意,它不会提前终止执行!在这种情况下,您需要做一些更复杂的事情(并且可能自己实施中断) . 但是如果你只是不想继续等待不起作用的东西,那么关键是要等待第一件事情完成,并在没有任何东西或者遇到异常时停止:
以下是一切正常的例子:
但是当出现问题时:
为此我会使用Akka演员 . 与for-comprehension不同,它会在任何期货失败时立即失败,因此在这个意义上它会更有效率 .
然后,创建actor,向其发送消息(以便它知道将其回复发送到何处)并等待回复 .
这个问题已得到解答,但我发布了我的 Value 类解决方案( Value 类在2.10中添加),因为这里没有一个 . 请随意批评 .
ConcurrentFuture是一个无开销的Future包装器,它将默认的Future map / flatMap从do-this-then-that更改为all-all-and-fail-if-any-fail . 用法:
在上面的示例中,f1,f2和f3将同时运行,如果任何顺序失败,元组的未来将立即失败 .
您可能想要查看Twitter的Future API . 特别是Future.collect方法 . 它完全符合您的要求:https://twitter.github.io/scala_school/finagle.html
源代码Future.scala可在此处获取:https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
你可以用这个: