首页 文章

Kafka主题到websocket

提问于
浏览
2

我正在尝试实现一个设置,其中我有多个Web浏览器打开与我的akka-http服务器的websocket连接,以便读取发布到kafka主题的所有消息 .

所以消息流应该这样

kafka topic -> akka-http -> websocket connection 1 
                         -> websocket connection 2
                         -> websocket connection 3

现在我已经为websocket创建了一个路径:

val route: Route = 
 path("ws") {
   handleWebSocketMessages(notificationWs)
 }

然后我为我的kafka主题创建了一个消费者:

val consumerSettings = ConsumerSettings(system,
  new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("topic1"))

最后我想将此源连接到handleWebSocketMessages中的websocket

def handleWebSocketMessages: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(source)::Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }

这是我尝试在TextMessage中使用 source 时出现的错误:

错误:(77,9)重载方法值适用于替代方法:(textStream:akka.stream.scaladsl.Source [String,Any])akka.http.scaladsl.model.ws.TextMessage(text:String)akka.http .scaladsl.model.ws.TextMessage.Strict无法应用于(akka.stream.scaladsl.Source [org.apache.kafka.clients.consumer.ConsumerRecord [Array [Byte],String],akka.kafka.scaladsl.Consumer .Control])TextMessage(source):: Nil

我想我在这个过程中犯了很多错误,但我会说最阻塞的部分是 handleWebSocketMessages .

1 回答

  • 3

    首先,要了解源是类型: Source[ConsumerRecord[K, V], Control] . 所以,这不是你可以作为TextMessage的参数传递的东西 .

    现在,让我们从websocket的角度来看:

    • 为Kafka源中的每条消息构建传出消息 . 该消息将是来自Kafka消息的String转换的TextMessage .

    • 对于每个传入的消息,只需println()即可

    因此, Flow 可以看作两个组件: SourceSink .

    val incomingMessages: Sink[Message, NotUsed] =
      Sink.foreach(println(_))
    
    val outgoingMessages: Source[Message, NotUsed] =
      source
        .map { consumerRecord => TextMessage(consumerRecord.record.value) }
    
    val handleWebSocketMessages: Flow[Message, Message, Any]  
      = Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
    

    希望能帮助到你 .

相关问题