我们正在使用Kafka stream的SessionWindows来聚合相关事件的到来 . 与聚合一起,我们使用 until()
API指定窗口的保留时间 . Stream info :
会话窗口(不活动时间)为1分钟,传递到 until()
的保留时间为2分钟 . 我们使用自定义 TimestampExtractor
来映射事件的时间 .
例:
事件:e1; eventTime:上午10:00:00;到达时间:下午2点(当天)
事件:e2; eventTime:上午10:00:30;到达时间下午2:10(当天)
第二次事件的到达时间是e1到达后10分钟,超过保留时间不活动时间 . 但是旧的事件e1仍然是聚合的一部分,尽管保留时间为2分钟 .
问题:
1)kafka如何使用 until()
API清理状态存储?由于指定为参数的保留值是"lower bound for how long a window will be maintained."当窗口被清除时?
2)是否有后台线程定期清理状态存储?如果是,那么有没有办法确定窗口被清除的实际时间 .
3)在保留时间之后清除窗口数据的任何流配置 .
1 回答
在回答您的具体问题之前:请注意,保留时间不是基于系统时间,而是基于"stream time" . "Stream time"是基于
TimestampExtractor
返回的内部跟踪时间进度而不会进入太多细节:对于具有2条记录的示例,当第二条记录到达时"stream time"将提前30秒,因此保留时间尚未过去 .另请注意,如果没有新数据到达,则"stream time"不会提前(至少一个分区) . This holds for Kafka 0.11.0 and older but might change in future releases.
对你的问题:
(1)Kafka Streams将所有商店更新写入changelog主题和本地RocksDB商店 . 两者都分为具有一定大小的所谓段 . 如果新数据到达(即,"stream time"进展),则创建新段 . 如果发生这种情况,如果旧段中的所有记录都早于保留时间(即,记录时间戳小于"stream time"减去保留时间),则删除旧段 .
(2)因此,没有后台线程,但清理是常规处理的一部分,
(3)没有强制清除旧记录/窗口的配置 .
如果所有记录都已过期,则会删除整个段,段中的旧记录(很可能是较小/较旧的时间戳)维持的时间长于保留时间 . 这种设计背后的动机是性能:在每个记录的基础上到期将太昂贵 .