我的Kafka Streams聚合读取了一个紧凑的主题并执行此操作:
(0_10, ..)
, (0_11, ..)
--->
(0, [10])
(0, [10, 11])
我想知道如何控制聚合时间窗口,因此它不会为每个传入的消息发送消息,而是等待并聚合其中的一些消息 . Imagine Stream App使用以下消息:
-
(0_10, ..)
-
(1_11, ..)
-
(0_13, ..)
如果以前的3条消息在短时间内到达,我希望看到:
-
(0,[10])
-
(0, [10, 13])
-
(1, [11])
我无法弄清楚,在吐出新值之前,如何告诉我的Kafka Stream应用程序等待更多聚合需要多长时间 .
我的代码非常简单
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
目前,它有时会批量聚合,但不确定如何调整它:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
1 回答
对于Kafka Streams ' windowing. Generally speaking, Kafka Streams windows don' t "close"或"end"这是不可能的,因为你可以't tell it to produce a final result once a window 1751528 (there'没有这样的概念) . 这是为了适应迟到的结果 . 当消息到达聚合窗口时,您将看到更新 . Kafka Streams吐出更新的频率取决于缓存(见下文) . 欲了解更多信息,请参阅:How to send final kafka-streams aggregation result of a time windowed KTable?
您在那里看到的最有可能是在
KTables
的商店中缓存的结果 .KTables
仅在更改日志刷新并提交其偏移量时转发下游消息 . 这是为了在需要恢复状态时保持一致性 . 如果更改Kafka Streams ' application'的提交间隔,则缓存刷新频率会降低,因此您将看到从KTable
转发的更新更少(更改日志,聚合等) . 但那与窗口无关 .尽管如此,如果您想要对更改日志流进行窗口化聚合,可以使用
KTable#toStream()
将其从KTable
转换为KStream
. 然后,您可以在聚合步骤中指定窗口 .