首页 文章

Scala:Merge Sort使用Futures超时

提问于
浏览
0

以下顺序合并排序可以非常快速地返回结果: -

def mergeSort(xs: List[Int]): List[Int] = {
    def merge(xs: List[Int], ys: List[Int]): List[Int] = (xs, ys) match {
      case (Nil, _) => ys
      case (_, Nil) => xs
      case (x :: xs1, y :: ys1) => if (x <= y) x :: merge(xs1, ys) else y :: merge(xs, ys1)
    }
    val mid = xs.length / 2
    if (mid <= 0) xs
    else {
      val (xs1, ys1) = xs.splitAt(mid)



      merge(mergeSort(xs1), mergeSort(ys1))
    }
  }

  val newList = (1 to 10000).toList.reverse

  mergeSort(newList)

但是,当我尝试使用Futures并行化时,它会超时: -

def mergeSort(xs: List[Int]): List[Int] = {
    def merge(xs: List[Int], ys: List[Int]): List[Int] = (xs, ys) match {
      case (Nil, _) => ys
      case (_, Nil) => xs
      case (x :: xs1, y :: ys1) => if (x <= y) x :: merge(xs1, ys) else y :: merge(xs, ys1)
    }
    val mid = xs.length / 2
    if (mid <= 0) xs
    else {
      val (xs1, ys1) = xs.splitAt(mid)
      val sortedList1 = Future{mergeSort(xs1)}
      val sortedList2 = Future{mergeSort(ys1)}

      merge(Await.result(sortedList1,5 seconds), Await.result(sortedList2,5 seconds))
    }
  }

  val newList = (1 to 10000).toList.reverse

  mergeSort(newList)

我得到一个超时异常 . 我知道这可能是因为这个代码会产生log2 10000个线程,这会增加很多延迟,因为执行上下文Threadpool可能没有那么多线程 .

1.)如何利用合并排序中的固有并行性并并行化此代码?

2.)对于什么用例,期货是有用的,什么时候应该避免?

Edit 1 : Refactored code based on the feedback I've gotten so far :-

def mergeSort(xs: List[Int]): Future[List[Int]] = {

    @tailrec
    def merge(xs: List[Int], ys: List[Int], acc: List[Int]): List[Int] = (xs, ys) match {
      case (Nil, _) => acc.reverse ::: ys
      case (_, Nil) => acc.reverse ::: xs
      case (x :: xs1, y :: ys1) => if (x <= y) merge(xs1, ys, x :: acc) else merge(xs, ys1, y :: acc)
    }

    val mid = xs.length / 2
    if (mid <= 0) Future {
      xs
    }
    else {
      val (xs1, ys1) = xs.splitAt(mid)
      val sortedList1 = mergeSort(xs1)
      val sortedList2 = mergeSort(ys1)
      for (s1 <- sortedList1; s2 <- sortedList2) yield merge(s1, s2, List())
    }
  }

1 回答

  • 1

    通常在使用期货时,您应该a)尽可能少地等待并且更愿意在期货中工作,并且b)注意您正在使用的执行环境 .

    作为a)的一个例子,这里是你如何改变这个:

    def mergeSort(xs: List[Int]): Future[List[Int]] = {
      def merge(xs: List[Int], ys: List[Int]): List[Int] = (xs, ys) match {
        case (Nil, _) => ys
        case (_, Nil) => xs
        case (x :: xs1, y :: ys1) => if (x <= y) x :: merge(xs1, ys) else y :: merge(xs, ys1)
      }
      val mid = xs.length / 2
      if (mid <= 0) Future(xs)
      else {
        val (xs1, ys1) = xs.splitAt(mid)
        val sortedList1 = mergeSort(xs1)
        val sortedList2 = mergeSort(ys1)
        for (s1 <- sortedList1; s2 <- sortedList2) yield merge(s1, s2)
      }
    }
    val newList = (1 to 10000).toList.reverse
    
    Await.result(mergeSort(newList), 5 seconds)
    

    然而,这里还有很多开销 . 通常,您只需并行化大量工作,以避免被开销占主导地位,在这种情况下,当递归到达低于某个常量大小的列表时,这可能意味着回退到单线程版本 .

相关问题