首页 文章

Scala:将[未来]列入未来[List],忽略失败的未来

提问于
浏览
98

我使用Playframework,所以最终,我真正想要的是 Future[Result] ,但是为了简单起见,让我们说 Future[List[Int]] 这样做的正常方法是使用 Future.sequence(...) 但是's a twist... The list I' m通常有大约10-20个期货在它中,它希望能够获得那些成功并返回那些的东西 .

例如,执行以下操作不起作用

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

我没有得到唯一的例外,而是希望能够将1和3拉出来 . 我尝试使用 Future.fold ,但显然只是在幕后调用 Future.sequence .

在此先感谢您的帮助!

5 回答

  • 9

    诀窍是首先确保没有一个未来失败 . .recover 在这里是你的朋友,你可以将它与 map 结合起来将所有的 Future[T] 结果转换为 Future[Try[T]]] 个实例,所有这些都肯定是成功的期货 .

    注意:您也可以使用 OptionEither ,但如果您特别想要捕获异常, Try 是最干净的方法

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
      f.map(Success(_)).recover(x => Failure(x))
    
    val listOfFutures = ...
    val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))
    

    然后像以前一样使用 Future.sequence ,给你一个 Future[List[Try[T]]]

    val futureListOfTrys = Future.sequence(listOfFutureTrys)
    

    然后筛选:

    val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))
    

    如果需要,您甚至可以提取特定的故障:

    val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))
    
  • 117

    我试过凯文的答案,我在我的Scala版本(2.11.5)上遇到了一个小故障......我纠正了这个问题,如果有人有兴趣的话还写了一些额外的测试......这是我的版本>

    implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {
    
        /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
          * The returned future is completed only once all of the futures in `fs` have been completed.
          */
        def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
          val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
          Future.sequence(listOfFutureTrys)
        }
    
        def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
          f.map(Success(_)) .recover({case x => Failure(x)})
        }
    
        def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
          allAsTrys(fItems).map(_.filter(_.isFailure))
        }
    
        def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
          allAsTrys(fItems).map(_.filter(_.isSuccess))
        }
    }
    
    
    // Tests... 
    
    
    
      // allAsTrys tests
      //
      test("futureToFutureTry returns Success if no exception") {
        val future =  Future.futureToFutureTry(Future{"mouse"})
        Thread.sleep(0, 100)
        val futureValue = future.value
        assert(futureValue == Some(Success(Success("mouse"))))
      }
      test("futureToFutureTry returns Failure if exception thrown") {
        val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
        Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
        val futureValue = future.value
    
        assertResult(true) {
          futureValue match {
            case Some(Success(Failure(error: IllegalStateException)))  => true
          }
        }
      }
      test("Future.allAsTrys returns Nil given Nil list as input") {
        val future =  Future.allAsTrys(Nil)
        assert ( Await.result(future, 100 nanosecond).isEmpty )
      }
      test("Future.allAsTrys returns successful item even if preceded by failing item") {
        val future1 =  Future{throw new IllegalStateException("bad news")}
        var future2 = Future{"dog"}
    
        val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
        System.out.println("successItem:" + listOfTrys);
    
        assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
        assert(listOfTrys(1) == Success("dog"))
      }
      test("Future.allAsTrys returns successful item even if followed by failing item") {
        var future1 = Future{"dog"}
        val future2 =  Future{throw new IllegalStateException("bad news")}
    
        val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
        System.out.println("successItem:" + listOfTrys);
    
        assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
        assert(listOfTrys(0) == Success("dog"))
      }
      test("Future.allFailedAsTrys returns the failed item and only that item") {
        var future1 = Future{"dog"}
        val future2 =  Future{throw new IllegalStateException("bad news")}
    
        val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
        assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
        assert(listOfTrys.size == 1)
      }
      test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
        var future1 = Future{"dog"}
        val future2 =  Future{throw new IllegalStateException("bad news")}
    
        val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
        assert(listOfTrys(0) == Success("dog"))
        assert(listOfTrys.size == 1)
      }
    
  • 1

    我刚刚遇到了这个问题并提供了另一种解决方案:

    def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                    (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                     executor: ExecutionContext): Future[M[A]] = {
        in.foldLeft(Future.successful(cbf(in))) {
          (fr, fa) ⇒ (for (r ← fr; a ← fa) yield r += a) fallbackTo fr
        } map (_.result())
    }
    

    这里的想法是,在折叠中,您正在等待列表中的下一个元素完成(使用for-comprehension语法),如果下一个元素失败,您只需回退到已有的内容 .

  • 1

    您可以使用选项轻松包装未来结果,然后展平列表:

    def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
        f.map(Some(_)).recover {
          case e => None
        }
    val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))
    
    val futureListOfOptions = Future.sequence(listOfFutureOptions)
    
    val futureListOfSuccesses = futureListOfOptions.flatten
    
  • 6

    Scala 2.12在 Future.transform 上有一个改进,它适用于代码较少的anwser .

    val futures = Seq(Future{1},Future{throw new Exception})
    
    val seq = Future.sequence(futures.map(_.transform(Success(_)))) // instead of map and recover
    
    val successes = seq.map(_.collect{case Success(x)=>x})
    //successes: Future[Seq[Int]] = Future(Success(List(1)))
    
    val failures = seq.map(_.collect{case Failure(x)=>x})
    //failures: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))
    

相关问题