首页 文章

Akka http连接池

提问于
浏览
2

我正在尝试为我们的akka http应用程序使用客户端连接池 . 但是,一旦达到最大连接数,请求似乎就会挂起 . 我已将问题浓缩为以下内容:

import java.lang.Thread.UncaughtExceptionHandler
import java.net.ServerSocket
import akka.actor.ActorSystem
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.http.scaladsl.client.RequestBuilding._

import scala.annotation.tailrec
import scala.util.{Success, Try}

object AkkaProblem extends App {
    val server = new ServerSocket(0)
    val serverPort = server.getLocalPort

    object responder extends Runnable with UncaughtExceptionHandler {
       val cr = '\r'
       val httpResponse =
  s"""HTTP/1.1 404 Not Found$cr
      |Content-Type: application/json;charset=UTF-8$cr
      |Date: Mon, 26 Sep 2016 06:30:13 GMT$cr
      |Connection: keep-alive$cr
      |Transfer-Encoding: chunked$cr
      |$cr
      |12$cr
      |{"Hello": "World"}$cr
      |0$cr
      |$cr
      |""".stripMargin

       override final def run(): Unit = {
           val socket = server.accept()
           @tailrec def sendResponse(): Unit = {
               socket.getOutputStream.write(httpResponse.getBytes)
               sendResponse()
           }

           sendResponse()
       }

       override def uncaughtException(t: Thread, e: Throwable): Unit = ()
    }

    for (nr <- 1 to 4) {
        val thread = new Thread(responder, s"response-thread-$nr")
        thread.setUncaughtExceptionHandler(responder)
        thread.setDaemon(true)
        thread.start()
    }


    implicit val system = ActorSystem("main")
    import system.dispatcher
    implicit val mat = ActorMaterializer()

    val serverUri = Uri(s"http://localhost:$serverPort")
    val request = Get(serverUri)

    val poolFlow: Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] =
        Http().newHostConnectionPool(serverUri.authority.host.address, serverUri.authority.port, ConnectionPoolSettings("max-connections: 4"))

   val source = Source.repeat(request).take(1000).map((_, ()))

   val runRequest = source.viaMat(poolFlow)(Keep.right).toMat(Sink.seq)(Keep.both)
   val (connectionPool, response) = runRequest.run()

   response.map(_.map(_._1)).andThen {
   case Success(responses) =>
       val byResultType = responses.groupBy(_.isSuccess).mapValues(_.size)

       println(s"Received response. Got ${byResultType.get(true)} successes, ${byResultType.get(false)} errors")
       connectionPool.shutdown() andThen {
       case done =>
           println("Connection pool shut down")
           system.terminate()
       }
   }
}

我希望该计划能够相对快速地报告1000次成功,然后关闭 . 相反,它无限期地挂起 . 当请求数量降低以匹配允许的连接数时,问题将自行解决 .

作为一种解决方法,我们可以为每个连接使用一个自己的池,但这样做会破坏拥有池的目的 .

堆栈转储显示没有死锁或其他明显的错误行为:

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1000m; support was removed in 8.0
2016-09-26 13:24:18
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode):

"Attach Listener" #25 daemon prio=9 os_prio=31 tid=0x00007f86bf001000 nid=0x3307 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"main-akka.actor.default-dispatcher-10" #24 prio=5 os_prio=31 tid=0x00007f86bbb4a000 nid=0x6b03 waiting on condition [0x000000011d717000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-9" #23 prio=5 os_prio=31 tid=0x00007f86bc402800 nid=0x6903 waiting on condition [0x000000011d614000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-8" #22 prio=5 os_prio=31 tid=0x00007f86bbb49800 nid=0x6703 waiting on condition [0x000000011d511000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-7" #21 prio=5 os_prio=31 tid=0x00007f86bb292000 nid=0x6503 waiting on condition [0x000000011d40e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.io.pinned-dispatcher-6" #20 prio=5 os_prio=31 tid=0x00007f86bcbcd000 nid=0x6407 runnable [0x000000011d10b000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked  (a sun.nio.ch.Util$2)
        - locked  (a java.util.Collections$UnmodifiableSet)
        - locked  (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
        at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:115)
        at akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:219)
        at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:148)
        at akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:67)
        at akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:71)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

"DestroyJavaVM" #19 prio=5 os_prio=31 tid=0x00007f86bcba2800 nid=0xd03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"main-akka.actor.default-dispatcher-5" #18 prio=5 os_prio=31 tid=0x00007f86bc252800 nid=0x5d03 waiting on condition [0x000000011c488000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-4" #17 prio=5 os_prio=31 tid=0x00007f86bc823800 nid=0x5b03 waiting on condition [0x000000011c185000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-3" #16 prio=5 os_prio=31 tid=0x00007f86bba26000 nid=0x5903 waiting on condition [0x000000011c082000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-2" #15 prio=5 os_prio=31 tid=0x00007f86bc256000 nid=0x5703 waiting on condition [0x000000011bf7f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-scheduler-1" #14 prio=5 os_prio=31 tid=0x00007f86bc248800 nid=0x5503 waiting on condition [0x000000011b8d2000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:87)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:268)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-4" #13 daemon prio=5 os_prio=31 tid=0x00007f86bc195000 nid=0x5303 runnable [0x000000011b7cf000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-3" #12 daemon prio=5 os_prio=31 tid=0x00007f86bb0fa800 nid=0x5103 runnable [0x000000011b6cc000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-2" #11 daemon prio=5 os_prio=31 tid=0x00007f86bb9ca000 nid=0x4f03 runnable [0x000000011b5c9000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-1" #10 daemon prio=5 os_prio=31 tid=0x00007f86bb9c1000 nid=0x4d03 runnable [0x000000011b4c6000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"Monitor Ctrl-Break" #9 daemon prio=5 os_prio=31 tid=0x00007f86bb078000 nid=0x4b03 runnable [0x000000011b101000]
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:79)
        at java.lang.Thread.run(Thread.java:745)

"Service Thread" #8 daemon prio=9 os_prio=31 tid=0x00007f86bc810800 nid=0x4703 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007f86bc805800 nid=0x4503 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007f86bb866000 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007f86bb833800 nid=0x4103 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007f86bb844000 nid=0x3e23 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f86bc001000 nid=0x2b03 in Object.wait() [0x0000000118c24000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        - locked  (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f86bb81f000 nid=0x2903 in Object.wait() [0x0000000118b21000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
        - locked  (a java.lang.ref.Reference$Lock)

"VM Thread" os_prio=31 tid=0x00007f86bb014800 nid=0x2703 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007f86bc005000 nid=0x1f03 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007f86bc005800 nid=0x2103 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007f86bc006800 nid=0x2303 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007f86bc007000 nid=0x2503 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007f86bb80c000 nid=0x4903 waiting on condition 

JNI global references: 250

1 回答

  • 3

    您需要从请求中明确使用 HttpResponse 的实体(主体) . 因为响应的实体实际上是一个流,所以如果不使用它,它会保持连接打开 . documentation详细说明了请求响应周期 . 服务器必须在标头中发送 Connection: close ,或者您必须附加一些 Sink (例如 Sink.ignore )来使用流 .

    在实践中,有几种方法可以处理 HttpResponse . 一种是调用 HttpResponsetoStrict(timeout: FiniteDuration) 方法,它将获得整个实体并关闭连接 . timeout 限制HTTP请求等待发送方响应的时间 . 如果您对实体不感兴趣,也可以在 HttpResponse 上调用 discardEntityBytes() 方法 . 最后,您可以通过一些有效的接收器(例如 Unmarshal(resp.entity).to[SomeClass] )来使用流 .

相关问题