我正在使用FluentD(第12版稳定版)向Kafka发送消息 . 但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1 . 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为当消息到达kafka时的时间点 .
我真正感兴趣的时间戳是由流利的信息发送的:
“timestamp”:“1507885936”,“host”:“V.X.Y.Z . ”
Kafka 的记录表示:
offset = 0,timestamp = - 1,key = null,value = {“timestamp”:“1507885936”,“host”:“V.X.Y.Z . ”}
我想在 Kafka 有这样的记录:
offset = 0,timestamp = 1507885936,key = null,value = {“timestamp”:“1507885936”,“host”:“V.X.Y.Z . ”}
我的解决方法如下: - 编写一个消费者来提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
- 编写 生产环境 者以生成设置了时间戳的新记录(ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value)
如果有的话,我更喜欢KafkaStreams解决方案 .
1 回答
您可以编写一个非常简单的Kafka Streams应用程序,例如:
并使用自定义
TimestampExtractor
配置应用程序,从记录中提取时间戳并将其返回 .在将记录写回Kafka时,Kafka Streams将使用返回的时间戳 .