我正在尝试实现一个设置,其中我有多个Web浏览器打开与我的akka-http服务器的websocket连接,以便读取发布到kafka主题的所有消息 .
所以消息流应该这样
kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3
现在我已经为websocket创建了一个路径:
val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}
然后我为我的kafka主题创建了一个消费者:
val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))
最后我想将此源连接到handleWebSocketMessages中的websocket
def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
这是我尝试在TextMessage中使用 source
时出现的错误:
错误:(77,9)重载方法值适用于替代方法:(textStream:akka.stream.scaladsl.Source [String,Any])akka.http.scaladsl.model.ws.TextMessage(text:String)akka.http .scaladsl.model.ws.TextMessage.Strict无法应用于(akka.stream.scaladsl.Source [org.apache.kafka.clients.consumer.ConsumerRecord [Array [Byte],String],akka.kafka.scaladsl.Consumer .Control])TextMessage(source):: Nil
我想我在这个过程中犯了很多错误,但我会说最阻塞的部分是 handleWebSocketMessages
.
1 回答
首先,要了解源是类型:
Source[ConsumerRecord[K, V], Control]
. 所以,这不是你可以作为TextMessage的参数传递的东西 .现在,让我们从websocket的角度来看:
为Kafka源中的每条消息构建传出消息 . 该消息将是来自Kafka消息的String转换的TextMessage .
对于每个传入的消息,只需println()即可
因此,
Flow
可以看作两个组件:Source
和Sink
.希望能帮助到你 .