我在Kafka主题“原始数据”中获取CSV,目标是通过使用正确的时间戳(每行不同)发送另一个主题“数据”中的每一行来转换它们 .
目前,我有2个飘带:
-
一个用于拆分"raw-data"中的行,将它们发送到"internal"主题(无时间戳)
-
一个
TimestampExtractor
消耗"internal"并将它们发送到"data" .
我想通过直接设置时间戳来删除使用这个“内部”主题但我找不到方法(时间戳提取器仅在消费时使用) .
我在文档中偶然发现了这一行:
注意,可以在处理器API中更改描述默认行为,方法是在调用#forward()时明确地为输出记录分配时间戳 .
但我找不到任何带时间戳的签名 . 他们的意思是什么?
你会怎么做?
编辑:要清楚,我有一个Kafka主题,其中一条消息包含事件时间和一些值,例如:
2018-01-01,hello 2018-01-02,world
(这是一条消息,而不是两条消息)
我想在另一个主题中获取两条消息,其中Kafka记录时间戳设置为其事件时间(2018-01-01和2018-01-02),而不需要中间主题 .
1 回答
设置输出的时间戳需要Kafka Streams 2.0,并且仅在Processor API中受支持 . 如果使用DSL,则可以使用
transform()
来使用这些API .正如您所指出的,您将使用
context.forward()
. 电话会是: