首页 文章

Kafka Streams窗口聚合批处理

提问于
浏览
0

我在我的应用程序中处理了Kafka Streams:

myStream
    .mapValues(customTransformer::transform)
    .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
    .windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
    .aggregate(CustomCollectorObject::new,
            (key, value, aggregate) -> aggregate.collect(value),
            Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
                    .withValueSerde(new CustomCollectorSerde()))
    .toStream()
    .foreach((k, v) -> /* do something very important */);

Expected behavior: 传入消息按密钥分组,并在 CustomCollectorObject 中聚合在一定时间间隔内 . CustomCollectorObject 只是一个里面有 List 的类 . 在 foreach 中每10秒钟后,我正在使用我的聚合数据做一些非常重要的事情 . 什么是非常重要的我期望每10秒调用 foreach

Actual behavior: 我可以看到我的 foreach 中的处理被称为稀有,大约每30-35秒,这并不重要 . 什么是非常重要的,我一次收到3-4条消息 .

The question is: 如何达到预期的行为?我需要在运行时处理我的数据,没有任何延迟 .

我试图设置 cache.max.bytes.buffering: 0 ,但在这种情况下,窗口根本不起作用 .

1 回答

相关问题