是否使用源队列实现线程安全的akka-http中的连接池?

参考以下提到的实现:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

从多个线程提供队列http请求是否可以安全线程?如果不是,那么实施此类要求的最佳方式是什么?也许使用一个专门的演员?

回答(2)

2 years ago

不,它不是线程安全的,根据api docSourceQueue that current source is materialized to is for single thread usage only.

一个专门的演员可以正常工作但是,如果可以,使用 Source.actorRefdoc link)而不是 Source.queue 会更容易 .

一般来说, Source.actorRef 的缺点是缺乏背压,但是当你使用 OverflowStrategy.dropNew 时,很明显你不要指望背压 . 因此,您可以使用 Source.actorRef 获得相同的行为 .

2 years ago

正如@ frederic-a正确陈述的那样, SourceQueue 不是线程安全的解决方案 .

也许合适的解决方案是使用 MergeHub (有关详细信息,请参阅docs) . 这有效地允许您分两个阶段运行图表 .

从您的集线器到水槽的

  • (这实现了到水槽)

  • 将第1点物化的接收器分发给您的用户 . Sink s实际上是为了分发而设计的,所以这是非常安全的 .

根据 MergeHub 行为,该解决方案将是安全的背压

如果消费者无法跟上,那么所有的 生产环境 者都会受到压力 .

代码示例如下:

val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
  MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
  .via(poolClientFlow)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p))    => p.failure(e)
  }))(Keep.left)
  .run()

// on the user threads

val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
source.runWith(reqSink)