首页 文章

Kafka Streams窗口如何工作?

提问于
浏览
0

我很难理解Windowing在Kafka Streams中是如何工作的 . 结果似乎与我到目前为止所阅读和理解的内容不一致 .

我创建了一个带有支持主题的KSQL Stream . KSQL SELECT语句中的一个“列”已被指定为该主题的TIMESTAMP .

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

my-stream-topic中的记录按键(PARTITION_KEY)分组,并使用跳跃窗口加窗

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

记录通过汇总

val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

然后我通过打印到控制台

dataAgg.print()
topicStats.print()

组中的第一个窗口转换为7:00 - 7:05

当我通过控制台消费者检查my-stream-topic中的记录时,我发现有2条记录属于上述窗口 . 但是,聚合器只能获取其中一个 .

我认为dataAgg窗口KTable将包含分组密钥的1条记录,但聚合将使用2条记录来计算聚合 . 打印的总值不正确 .

我错过了什么?

1 回答

  • 1

    KSQL可以在写入时设置记录时间戳,但是您需要在创建输入流时指定时间戳,而不是在定义输出流时 . 即,为输入流指定的时间戳将用于在写入时设置记录元数据字段 .

    这种行为相当不直观,我为此问题打开了一张票:https://github.com/confluentinc/ksql/issues/1367

    因此,在为问题中显示的查询创建输入流时,需要指定 with(TIMESTAMP='recorded_timestamp') 子句 . 如果无法做到这一点,因为您的查询需要在不同的时间戳上运行,您需要指定将数据复制到新主题的第二个查询 .

    CREATE STREAM my_stream_with_ts
        WITH (KAFKA_topic='my-stream-topic-with-ts')
    AS select * from my_stream PARTITION BY PARTITION_KEY;
    

    作为替代方案,您可以为Kafka Streams应用程序设置自定义时间戳提取器,以从有效负载中提取时间戳 .

相关问题