我在我的应用程序中处理了Kafka Streams:
myStream
.mapValues(customTransformer::transform)
.groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
.windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
.aggregate(CustomCollectorObject::new,
(key, value, aggregate) -> aggregate.collect(value),
Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
.withValueSerde(new CustomCollectorSerde()))
.toStream()
.foreach((k, v) -> /* do something very important */);
Expected behavior: 传入消息按密钥分组,并在 CustomCollectorObject
中聚合在一定时间间隔内 . CustomCollectorObject
只是一个里面有 List
的类 . 在 foreach
中每10秒钟后,我正在使用我的聚合数据做一些非常重要的事情 . 什么是非常重要的我期望每10秒调用 foreach
!
Actual behavior: 我可以看到我的 foreach
中的处理被称为稀有,大约每30-35秒,这并不重要 . 什么是非常重要的,我一次收到3-4条消息 .
The question is: 如何达到预期的行为?我需要在运行时处理我的数据,没有任何延迟 .
我试图设置 cache.max.bytes.buffering: 0
,但在这种情况下,窗口根本不起作用 .
1 回答
Kafka Streams具有不同的执行模型并提供不同的语义,即,您的期望与Kafka Streams的功能不匹配 . 已经存在多个类似的问题:
How to send final kafka-streams aggregation result of a time windowed KTable?
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
https://www.confluent.io/blog/streams-tables-two-sides-same-coin
另请注意,社区目前正在开发一个名为
suppress()
的新运算符,它将能够提供您想要的语义:https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables现在,你需要添加一个带有状态存储的
transform()
,并使用标点来获得你想要的语义(c.f. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor)