首页 文章

Akka HTTP:将来阻止阻止服务器

提问于
浏览
35

我正在尝试使用Akka HTTP来基本验证我的请求 . 碰巧我有一个外部资源来进行身份验证,因此我必须对此资源进行休息调用 .

这需要一些时间,并且在处理时,我的API的其余部分似乎被阻止,等待此调用 . 我用一个非常简单的例子重现了这个:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

如果我发布到日志 endpoints ,我的get endpoints 也会等待5秒,这是日志 endpoints 所指示的 .

这是预期的行为,如果是,如何在不阻止整个API的情况下进行阻止操作?

2 回答

  • 124

    你观察到的是预期的行为 - 当然它非常糟糕 . 很好,已知的解决方案和最佳实践可以防范它 . 在这个答案中,我想花一些时间来解释这个问题的简短,长久,然后深入 - 享受阅读!

    Short answer :“不要阻止路由基础设施!”,总是使用专用的调度程序来阻止操作!

    Cause of the observed symptom: 问题是你使用 context.dispatcher 作为阻塞期货执行的调度程序 . 路由基础结构使用相同的调度程序(简单地说就是"bunch of threads")来实际处理传入的请求 - 因此,如果阻止所有可用的线程,最终会使路由基础结构处于饥饿状态 . (争论和基准测试的一点是,如果Akka HTTP可以保护这一点,我会将其添加到我的研究todo-list中) .

    必须特别小心处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们将执行分成如此简单的原因),如Akka文档部分所述:Blocking needs careful management .

    我想引起注意的其他事情是,如果可能的话,应该尽量避免阻止API - 如果你的长时间运行操作不是真正的一个操作,而是一系列操作,你可以将它们分成不同的参与者或顺序未来 . 无论如何,只是想指出 - 如果可能的话,避免这种阻止呼叫,但如果你必须 - 那么下面将解释如何正确处理这些 .

    In-depth analysis and solutions

    现在我们知道出了什么问题,从概念上讲,让我们看看上面的代码究竟是什么,以及如何正确解决这个问题:

    颜色=线程状态:

    • 绿松石 - 睡觉

    • orange - 等待

    • 绿色 - RUNNABLE

    现在让我们调查3段代码以及调度程序的影响以及应用程序的性能 . 要强制执行此操作,应用程序已置于以下负载下:

    • [a]继续请求GET请求(参见上面的代码中的初始问题),它没有阻止

    • [b]然后经过一段时间的火灾2000 POST请求,这将在返回未来之前导致5秒阻塞

    1) [bad] Dispatcher behaviour on bad code

    // BAD! (due to the blocking in Future):
    implicit val defaultDispatcher = system.dispatcher
    
    val routes: Route = post { 
      complete {
        Future { // uses defaultDispatcher
          Thread.sleep(5000)                    // will block on the default dispatcher,
          System.currentTimeMillis().toString   // starving the routing infra
        }
      }
    }
    

    所以我们将我们的app暴露给[a]加载,你可以看到许多akka.actor.default-dispatcher线程 - 他们正在处理请求 - 小绿色片段,而橙色意味着其他人实际上在那里闲置 .

    blocking is killing the default dispatcher

    然后我们启动[b]加载,这导致阻塞这些线程 - 你可以看到早期线程“default-dispatcher-2,3,4”在之前空闲之后进入阻塞状态 . 我们还观察到池增长 - 新线程启动“default-dispatcher-18,19,20,21 ......”但是他们立即进入睡眠状态(!) - 我们在这里浪费宝贵的资源!

    此类启动线程的数量取决于默认的调度程序配置,但可能不会超过50左右 . 由于我们刚刚解雇了2k阻塞操作,我们将整个线程池挨饿 - 阻塞操作占主导地位,因此路由infra没有可用于处理其他请求的线程 - 非常糟糕!

    让我们做一些事情(这是一个Akka最佳实践btw - 总是隔离阻塞行为,如下所示):

    2) [good!] Dispatcher behaviour good structured code/dispatchers

    在您的 application.conf 配置此调度程序专门用于阻止行为:

    my-blocking-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        // in Akka previous to 2.4.2:
        core-pool-size-min = 16
        core-pool-size-max = 16
        max-pool-size-min = 16
        max-pool-size-max = 16
        // or in Akka 2.4.2+
        fixed-pool-size = 16
      }
      throughput = 100
    }
    

    您应该阅读Akka Dispatchers文档中的更多内容,以了解各种选项 . 但主要的一点是,我们选择了一个 ThreadPoolExecutor ,它具有线程的硬限制,可用于阻塞操作 . 大小设置取决于您的应用程序的功能以及服务器具有的核心数 .

    接下来我们需要使用它,而不是默认的:

    // GOOD (due to the blocking in Future):
    implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
    
    val routes: Route = post { 
      complete {
        Future { // uses the good "blocking dispatcher" that we configured, 
                 // instead of the default dispatcher – the blocking is isolated.
          Thread.sleep(5000)
          System.currentTimeMillis().toString
        }
      }
    }
    

    我们使用相同的负载对应用程序施压,首先是一些正常的请求,然后我们添加阻塞的请求 . 这是ThreadPools的方式在这种情况下表现:

    the blocking pool scales to our needs

    所以最初普通请求很容易被默认调度程序处理,你可以在那里看到一些绿线 - 这是实际的执行(我实际上并没有把服务器置于高负载下,所以它主要是空闲的) .

    现在,当我们开始发出阻塞操作时, my-blocking-dispatcher-* 启动,并启动配置线程的数量 . 它处理所有睡眠 . 此外,在这些线程上发生一段时间后,它会关闭它们 . 如果我们用另一堆阻塞命中服务器,那么池会启动新线程来处理sleep() - 但同时 - 我们不会在"just stay there and do nothing"上浪费我们宝贵的线程 .

    使用此设置时,正常GET请求的吞吐量没有受到影响,他们仍然很乐意在(仍然相当免费)默认调度程序上提供服务 .

    这是处理反应式应用程序中任何类型阻塞的推荐方法 . 它通常被称为"bulkheading"(或"isolating")应用程序的不良行为部分,在这种情况下,不良行为是睡眠/阻塞 .

    3) [workaround-ish] Dispatcher behaviour when blocking applied properly

    在这个例子中,我们使用scaladoc for scala.concurrent.blocking方法,这可以在面对阻塞操作时提供帮助 . 它通常会导致更多的线程被旋转以在阻塞操作中存活 .

    // OK, default dispatcher but we'll use `blocking`
    implicit val dispatcher = system.dispatcher
    
    val routes: Route = post { 
      complete {
        Future { // uses the default dispatcher (it's a Fork-Join Pool)
          blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                     // but at the cost of exploding the number of threads (which eventually
                     // may also lead to starvation problems, but on a different layer)
            Thread.sleep(5000)
            System.currentTimeMillis().toString
           }
        }
      }
    }
    

    该应用程序将表现如下:

    blocking causes more threads to be started

    您会注意到创建了很多新线程,这是因为在"oh, this'll be blocking, so we need more threads"处阻止了提示 . 这导致我们运行大量(不受控制的)线程的总时间,与2)解决方案相反,我们确切地知道我们专门用于阻塞行为的线程数 .

    Summing up :永远不要阻止默认调度程序:-)

    最佳实践是使用 2) 中显示的模式,为可用的阻塞操作提供调度程序,并在那里执行它们 .

    希望这有帮助,快乐的hakking!

    Discussed Akka HTTP version2.0.1

    Profiler used: 很多人私下问我这个回答是什么用于在上面的图片中可视化线程状态的探查器,所以在这里添加这些信息:我使用了YourKit这是一个很棒的商业分析器(OSS免费),尽管你可以使用免费的VisualVM from OpenJDK实现相同的结果 .

  • 3

    奇怪,但对我来说一切正常(没有阻塞) . 这是代码:

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    import akka.http.scaladsl.server.Route
    import akka.stream.ActorMaterializer
    
    import scala.concurrent.Future
    
    
    object Main {
    
      implicit val system = ActorSystem()
      implicit val executor = system.dispatcher
      implicit val materializer = ActorMaterializer()
    
      val routes: Route = (post & entity(as[String])) { e =>
        complete {
          Future {
            Thread.sleep(5000)
            e
          }
        }
      } ~
        (get & path(Segment)) { r =>
          complete {
            "get"
          }
        }
    
      def main(args: Array[String]) {
    
        Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
          case e =>
            system.shutdown()
        }
      }
    }
    

    您也可以将异步代码包装到 onCompleteonSuccess 指令中:

    onComplete(Future{Thread.sleep(5000)}){e} 
    
    onSuccess(Future{Thread.sleep(5000)}){complete(e)}
    

相关问题