我正在构建一个Akka应用程序,并希望将某些actor的FSM状态转换暴露给外部消费者 . (最终目标是能够将状态转换消息推送到websocket,以便可以实时查看它们 . )
根据文档Combining dynamic stages to build a simple Publish-Subscribe service,看起来我需要公开代表pub-sub通道的Flow,以便消费者和 生产环境 者可以使用它 .
我遇到问题的部分是将新的Source附加到Flow,以便生成的每个新actor将其状态转换发布到Source . 另一个问题是向Flow添加新的接收器(最后,这将是websockets,但出于测试目的,它可能是任何新的接收器) .
首先,我连接MergeHub和BroadcastHub以形成“通道”,然后从物化接收器和源创建流:
val orderFlow: Flow[String, String, NotUsed] = {
val (sink, source) = MergeHub.source[String](16)
.toMat(BroadcastHub.sink(256))(Keep.both).run()
Flow.fromSinkAndSource(sink, source)
}
那么问题是如何动态地将新 生产环境 者和消费者添加到此流程?有任何想法吗?
1 回答
对于这个特殊问题,我不会使用
akka-stream
. 您描述的多播pub-sub类型更适合原始Actor
消息传递和EventStream .在某些情况下,我是akka-stream的忠实粉丝,但在这种情况下,我认为你正试图通过圆孔安装方形钉 .