首页 文章

如何在Akka Stream Kafka ConsumerSettings中指定Kafka Zookeeper endpoints

提问于
浏览
1

我有一个Kafka Broker,它运行在 Cloud 端的某个地方,当我试图通过命令行消费者工具从中消耗时,我可以使用消息 . 但是,当我在我的akka-stream kafka ConsumerSettings中放置相同的 endpoints 时,它无法正常工作 .
例如: - bin / kafka-console-consumer.sh --zookeeper hostname:2181 / xxx / yyy --topic topic-name这对我有用 . 但是,当我通过ConsumerSetting做它不起作用时,同样的事情

val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
  .withBootstrapServers("hostname:2181/xxx/yyy")
  .withGroupId(groupID)
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic-name"))

1 回答

  • 1

    如果指定--zookeeper,则使用旧的(和不安全的)使用者 . 如果基于 Cloud 的kafka代理设置正确,您应该能够使用console-consumer和--bootstrap-server param而不使用--zookeeper . 如果这不起作用,则可能无法使用外部advertised.listeners ip或主机名正确设置群集 . 您可能还希望防火墙关闭对zookeeper的直接客户端访问 .

相关问题