我使用Processor API创建kafka流应用程序 .
以下是我创建主题以将时间戳附加到所有传入消息的方法
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicName --config message.timestamp.type = CreateTime
工作流正在处理来自源主题的传入消息并将其发布到sink主题 . 出于某种奇怪的原因,我在源和宿主题消息中看到了相同的时间戳 . 比如说,在源主题中,消息创建时间是 T0 ,在sink主题中也是如此 .
我需要做些什么来查看sink主题消息中的更新时间戳?
1 回答
如果使用
CreateTime
配置主题,则时间戳存储将是 生产环境 者提供的时间戳 .对于普通
KafkaProducer
,您没有明确指定时间戳,KafkaProducer
使用System.currentTimeMillis()
并将消息发送给代理 .对于Kafka Streams,如果您读取具有特定时间戳的输入记录,我们将专用时间戳推理逻辑来计算结果记录的时间戳 . 因此,Kafka Streams在将其交给内部使用的
KafkaProducer
时明确设置时间戳,因此 生产环境 者只使用此时间戳并且不使用当前的挂钟时间 . 对于流处理,这通常是期望的行为 .如果您有一个简单的管道只将数据从一个主题复制到另一个主题,则时间戳推断将使用输入记录时间戳作为输出记录时间戳 .
你可以做两件事来获得不同的语义:
为您配置
WallClockTimestampExtractor
Kafka Streams应用程序 . 对于这种情况,Kafka Stream不会使用嵌入式记录时间戳,而是使用当前的挂钟时间来获取输出记录的时间戳 .使用
AppendTime
而不是CreateTime
配置输出主题 . 对于这种情况,代理总是用当前代理挂钟时间覆盖 生产环境 者提供的记录时间戳 .