我有一个Spark scala使用者连接到另一个集群上的Kafka代理(Kafka集群与CDH集群分开) . params
是我正确拾取的 Kafka 参数 .
val incomingstream = KafkaUtils.createDirectStream[String, String](
streamingContext, .....](topicSet, params))
print(incomingstream)
我能够在我的Kafka集群的控制台上制作和使用 . 但是在运行具有上述代码的spark消费者时,它只是在等待,即使我通过kafka控制台生成器发送消息,它也不会显示在日志打印上 . incomingstream
没有打印 .
我有从运行spark作业的节点到kafka集群的连接 . 以纱线模式提交 . 显示与kafka经纪人的连接 . (不确定问题是否是因为Kerberos ...在日志中没有说..)
使用CDH 5.10
Spark 2.2
Kafka 0.10
斯卡拉2.11.8
编辑:Kafka Params传递如下 . 从火花作业连接好我的kafka经纪人 - 打印日志
"bootstrap.servers" -> "<domain>:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "PLAINTEXT"
我的Kafka监听器被配置为明文(而不是SSL) - 但如果我通过上述内容,那就是抱怨
Selector:375 - Connection with 10.18.63.18 disconnected
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)