我正在使用Kafka JDK客户端ver 0.10.2.1
. 我能够为Kafka生成一个简单的消息进行"heartbeat"测试,但是我无法使用sdk消费来自同一主题的消息 . 当我进入Kafka CLI时,我能够使用该消息,所以我已经确认消息在那里 . 这里's the function I' m用于从我的Kafka服务器消费,使用道具 - 只有在我确实确认 produce()
成功之后才将我生成的消息传递给主题,如果请求我可以稍后发布该功能:
private def consumeFromKafka(topic: String, expectedMessage: String): Boolean = {
val props: Properties = initProps("consumer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJava)
var readExpectedRecord = false
try {
val records = {
val firstPollRecs = consumer.poll(MAX_POLLTIME_MS)
// increase timeout and try again if nothing comes back the first time in case system is busy
if (firstPollRecs.count() == 0) firstPollRecs else {
logger.info("KafkaHeartBeat: First poll had 0 records- trying again - doubling timeout to "
+ (MAX_POLLTIME_MS * 2)/1000 + " sec.")
consumer.poll(MAX_POLLTIME_MS * 2)
}
}
records.forEach(rec => {
if (rec.value() == expectedMessage) readExpectedRecord = true
})
} catch {
case e: Throwable => //log error
} finally {
consumer.close()
}
readExpectedRecord
}
private def initProps(propsType: String): Properties = {
val prop = new Properties()
prop.put("bootstrap.servers", kafkaServer + ":" + kafkaPort)
propsType match {
case "producer" => {
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("acks", "1")
prop.put("producer.type", "sync")
prop.put("retries", "3")
prop.put("linger.ms", "5")
}
case "consumer" => {
prop.put("group.id", groupId)
prop.put("enable.auto.commit", "false")
prop.put("auto.commit.interval.ms", "1000")
prop.put("session.timeout.ms", "30000")
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// poll just once, should only be one record for the heartbeat
prop.put("max.poll.records", "1")
}
}
prop
}
现在当我运行代码时,这是它在控制台中输出的内容:
13:04:21 - 为组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e发现了协调器serverName:9092(id:2147483647 rack:null) . 13:04:23 INFO oakcciConsumerCoordinator - 为组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:24撤销之前指定的分区[] INFO oakcciAbstractCoordinator - (重新)加入组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:25 INFO oakcciAbstractCoordinator - 已成功加入组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e与第1代13:04:26 INFO oakcciConsumerCoordinator - 设置新分配的分区[HeartBeat_Topic.Service_5.2018-08-03.13_04_10.377 -0]对于组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:27 INFO cpplutil.KafkaHeartBeatUtil - KafkaHeartBeat:第一次民意调查有0条记录 - 再次尝试 - 将超时加倍到60秒 .
然后没有别的,没有抛出错误 - 所以没有记录任何记录 . 有谁知道是什么阻止了'消费'的发生?订阅者似乎是成功的,因为我能够成功调用listTopics并列出partions没问题 .
1 回答
您的代码有错误 . 看来你的路线:
应该这样说
否则,你传入一个空的
firstPollRecs
,然后迭代它,显然什么都不返回 .