首页 文章

是否使用源队列实现线程安全的akka-http中的连接池?

提问于
浏览
6

参考以下提到的实现:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

从多个线程提供队列http请求是否可以安全线程?如果不是,那么实施此类要求的最佳方式是什么?也许使用一个专门的演员?

2 回答

  • 2

    不,它不是线程安全的,根据api docSourceQueue that current source is materialized to is for single thread usage only.

    一个专门的演员可以正常工作但是,如果可以,使用 Source.actorRefdoc link)而不是 Source.queue 会更容易 .

    一般来说, Source.actorRef 的缺点是缺乏背压,但是当你使用 OverflowStrategy.dropNew 时,很明显你不要指望背压 . 因此,您可以使用 Source.actorRef 获得相同的行为 .

  • 2

    正如@ frederic-a正确陈述的那样, SourceQueue 不是线程安全的解决方案 .

    也许合适的解决方案是使用 MergeHub (有关详细信息,请参阅docs) . 这有效地允许您分两个阶段运行图表 .

    从您的集线器到水槽的

    • (这实现了到水槽)

    • 将第1点物化的接收器分发给您的用户 . Sink s实际上是为了分发而设计的,所以这是非常安全的 .

    根据 MergeHub 行为,该解决方案将是安全的背压

    如果消费者无法跟上,那么所有的 生产环境 者都会受到压力 .

    代码示例如下:

    val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
      MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p))    => p.failure(e)
      }))(Keep.left)
      .run()
    
    // on the user threads
    
    val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
    source.runWith(reqSink)
    

相关问题