通过使用普通的k-streams,kafka将每个应用程序的偏移量存储在其内部偏移主题上 . 在应用程序重新启动时,应用程序将根据 auto.offset.reset
策略重新处理主题 . 这确实解释了here .
我正在使用kafka-stream的 GlobalKTable
来复制应用程序上的数据 . 但是重启后 it's not populated on applications whose id (StreamsConfig.APPLICATION_ID_CONFIG) does not change 重启(由于部署或崩溃) . 每当我使用新id启动stream应用程序的新实例时,都会填充 GlobalKTable
.
GlobalKTable
与启用了日志压缩功能的主题完全不同 . 创建StreamsBuilder#globalTable状态的javadoc:
streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))
请注意,无论StreamsConfig中的指定值如何,GlobalKTable始终应用“auto.offset.reset”策略“earliest” .
因此,我希望, regardless of the application id ,我的流应用程序从头开始读取 kglobaltable-store
主题并在本地填充商店github issue . 似乎javadoc引用的主题是 some-topic
而不是 kglobaltable-store
.
这是 GlobalKTable
的预期行为吗?此外,是否有支持 GlobalKTables
主题的保留政策?
当我们在 some-topic
上有保留策略时,此行为还会导致 kglobaltable-store
主题上的陈旧数据 . 一个例子如下:
在时间t0,让;
some-topic:(1,a) - >(2,b) - >(1,c)
kglobaltable-store:[(1,c),(2,b)]
一段时间后(2,b)受保留,我启动我的流应用程序(带有一个新的id),我的 GlobalKTable
只存储记录(1,c)如果是这种情况 .
EDIT: 我正在使用 InMemoryKeyValueStore
.
1 回答
因为你正在使用
InMemoryKeyValueStore
我假设你遇到了这个错误:https://issues.apache.org/jira/browse/KAFKA-6711作为解决方法,您可以删除全局存储的本地检查点文件(cf GlobalKTable checkpoints) - 这将在重新启动时触发引导 . 或者您切换回默认
RocksDB
商店 .顺便说一句:如果您直接将某个主题作为表或全局表读取,Kafka Streams不会为容错创建额外的更改日志主题,但为此目的使用原始输入主题(这会降低Kafka群集中的存储要求) . 因此,那些输入主题应该启用日志压缩 .