首页 文章

使用Kafka Streams在输出中设置时间戳

提问于
浏览
2

我在Kafka主题“原始数据”中获取CSV,目标是通过使用正确的时间戳(每行不同)发送另一个主题“数据”中的每一行来转换它们 .

目前,我有2个飘带:

  • 一个用于拆分"raw-data"中的行,将它们发送到"internal"主题(无时间戳)

  • 一个 TimestampExtractor 消耗"internal"并将它们发送到"data" .

我想通过直接设置时间戳来删除使用这个“内部”主题但我找不到方法(时间戳提取器仅在消费时使用) .

我在文档中偶然发现了这一行:

注意,可以在处理器API中更改描述默认行为,方法是在调用#forward()时明确地为输出记录分配时间戳 .

但我找不到任何带时间戳的签名 . 他们的意思是什么?

你会怎么做?

编辑:要清楚,我有一个Kafka主题,其中一条消息包含事件时间和一些值,例如:

2018-01-01,hello 2018-01-02,world (这是一条消息,而不是两条消息)

我想在另一个主题中获取两条消息,其中Kafka记录时间戳设置为其事件时间(2018-01-01和2018-01-02),而不需要中间主题 .

1 回答

  • 2

    设置输出的时间戳需要Kafka Streams 2.0,并且仅在Processor API中受支持 . 如果使用DSL,则可以使用 transform() 来使用这些API .

    正如您所指出的,您将使用 context.forward() . 电话会是:

    stream.transform(new TransformerSupplier() {
      public Transformer get() {
        return new Transformer() {
          // omit other methods for brevity
          // you need to get the `context` from `init()`
    
          public KeyValue transform(K key, V value) {
            // some business logic
    
            // you can call #forward() as often as you want
            context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));
    
            return null; // only return data via context#forward()
          }
        }
      }
    });
    

相关问题