我有一个Spark scala使用者连接到另一个集群上的Kafka代理(Kafka集群与CDH集群分开) . params 是我正确拾取的 Kafka 参数 .

val incomingstream = KafkaUtils.createDirectStream[String, String](
  streamingContext, .....](topicSet, params))
print(incomingstream)

我能够在我的Kafka集群的控制台上制作和使用 . 但是在运行具有上述代码的spark消费者时,它只是在等待,即使我通过kafka控制台生成器发送消息,它也不会显示在日志打印上 . incomingstream 没有打印 .

我有从运行spark作业的节点到kafka集群的连接 . 以纱线模式提交 . 显示与kafka经纪人的连接 . (不确定问题是否是因为Kerberos ...在日志中没有说..)

使用CDH 5.10

Spark 2.2

Kafka 0.10

斯卡拉2.11.8

编辑:Kafka Params传递如下 . 从火花作业连接好我的kafka经纪人 - 打印日志

"bootstrap.servers" -> "<domain>:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean),
    "security.protocol" -> "PLAINTEXT"

我的Kafka监听器被配置为明文(而不是SSL) - 但如果我通过上述内容,那就是抱怨

Selector:375 - Connection with 10.18.63.18 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)