参考以下提到的实现:
http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
从多个线程提供队列http请求是否可以安全线程?如果不是,那么实施此类要求的最佳方式是什么?也许使用一个专门的演员?
2 回答
不,它不是线程安全的,根据api doc:
SourceQueue that current source is materialized to is for single thread usage only.
一个专门的演员可以正常工作但是,如果可以,使用
Source.actorRef
(doc link)而不是Source.queue
会更容易 .一般来说,
Source.actorRef
的缺点是缺乏背压,但是当你使用OverflowStrategy.dropNew
时,很明显你不要指望背压 . 因此,您可以使用Source.actorRef
获得相同的行为 .正如@ frederic-a正确陈述的那样,
SourceQueue
不是线程安全的解决方案 .也许合适的解决方案是使用
MergeHub
(有关详细信息,请参阅docs) . 这有效地允许您分两个阶段运行图表 .从您的集线器到水槽的
(这实现了到水槽)
将第1点物化的接收器分发给您的用户 .
Sink
s实际上是为了分发而设计的,所以这是非常安全的 .根据
MergeHub
行为,该解决方案将是安全的背压代码示例如下: