情况:
有许多阻塞同步调用(这是一个无法更改的给定),这可能需要很长时间才能聚合结果 .
目标:
使呼叫无阻塞,然后等待最长时间(ms)并收集所有已成功的呼叫,即使有些可能因为超时而失败(因此我们可以降低失败呼叫的功能) .
当前解决方案:
下面的解决方案通过组合期货来工作,等待那个期货完成或超时,并且在NonFatal错误(超时)的情况下,它使用 completedFutureValues
方法提取成功完成的期货 .
import scala.concurrent.{Await, Future}
import scala.util.Random._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }
// use the same method 3 times, but in reality is different methods (with different types)
val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1))
val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2))
val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3))
val combinedFuture: Future[(String, String, String)] = for {
hw1 <- futureHelloWorld1
hw2 <- futureHelloWorld2
hw3 <- futureHelloWorld3
} yield (hw1, hw2, hw3)
val res = try {
Await.result(combinedFuture, 250.milliseconds)
} catch {
case NonFatal(_) => {
(
completedFutureValue(futureHelloWorld1, "fallback hello world 1"),
completedFutureValue(futureHelloWorld2, "fallback hello world 2"),
completedFutureValue(futureHelloWorld3, "fallback hello world 3")
)
}
}
def completedFutureValue[T](future: Future[T], fallback: T): T =
future.value match {
case Some(Success(value)) => value
case Some(Failure(e)) =>
fallback
case None =>
fallback
}
它将返回tuple3,其中包含已完成的未来结果或后备,例如: (hello world,fallback hello world 2,fallback hello world 3)
虽然这有效,但我对此并不特别满意 .
问题:
我们怎样才能改进这一点?
5 回答
如果我也可以建议一种方法 . 想法是避免一起阻塞,并在每个未来实际设置超时 . 这是一篇博文,我发现在做我的例子时非常有用,它有点旧,但是黄金的东西:
https://nami.me/2015/01/20/scala-futures-with-timeout/
这里的一个负面观点是你可能需要在解决方案中添加akka,但是再次它并不完全丑陋:
使用这种方法没有阻塞,你在每个阶段都有一个超时,这样你就可以微调并适应你真正需要的东西 . 我正在使用的只是为了展示它是如何工作的 .
为什么不写:
???
在这里发布由同事提供的解决方案,该解决方案基本上与问题中提供的解决方案相同,但使其更加清洁 .
使用他的解决方案可以写:
这可以使用期货元组和后备 . 使这成为可能的代码:
可能最好使用Future.sequence()从Collection [Future]返回Future [Collection]
一旦(据我所知)你将阻止当前线程并同步等待结果,我会说最简单的解决方案应该是:
在这里,您只需在AtomicReferences中引入结果,这些结果将在未来完成时更新,并检查所有期货已完成或最多250毫秒(超时)的刻度结果 .
或者,您可以使用回退和超时从here extend获得
Future with timeout
实现,而不仅仅使用Future.sequence
和Await,并保证所有Futures
将在成功或回退时及时完成 .