首页 文章

Kafka Streams GlobalKTable与应用程序的同步

提问于
浏览
0

通过使用普通的k-streams,kafka将每个应用程序的偏移量存储在其内部偏移主题上 . 在应用程序重新启动时,应用程序将根据 auto.offset.reset 策略重新处理主题 . 这确实解释了here .

我正在使用kafka-stream的 GlobalKTable 来复制应用程序上的数据 . 但是重启后 it's not populated on applications whose id (StreamsConfig.APPLICATION_ID_CONFIG) does not change 重启(由于部署或崩溃) . 每当我使用新id启动stream应用程序的新实例时,都会填充 GlobalKTable .

GlobalKTable 与启用了日志压缩功能的主题完全不同 . 创建StreamsBuilder#globalTable状态的javadoc:

streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))

请注意,无论StreamsConfig中的指定值如何,GlobalKTable始终应用“auto.offset.reset”策略“earliest” .

因此,我希望, regardless of the application id ,我的流应用程序从头开始读取 kglobaltable-store 主题并在本地填充商店github issue . 似乎javadoc引用的主题是 some-topic 而不是 kglobaltable-store .

这是 GlobalKTable 的预期行为吗?此外,是否有支持 GlobalKTables 主题的保留政策?

当我们在 some-topic 上有保留策略时,此行为还会导致 kglobaltable-store 主题上的陈旧数据 . 一个例子如下:

在时间t0,让;

some-topic:(1,a) - >(2,b) - >(1,c)

kglobaltable-store:[(1,c),(2,b)]

一段时间后(2,b)受保留,我启动我的流应用程序(带有一个新的id),我的 GlobalKTable 只存储记录(1,c)如果是这种情况 .

EDIT: 我正在使用 InMemoryKeyValueStore .

1 回答

  • 1

    因为你正在使用 InMemoryKeyValueStore 我假设你遇到了这个错误:https://issues.apache.org/jira/browse/KAFKA-6711

    作为解决方法,您可以删除全局存储的本地检查点文件(cf GlobalKTable checkpoints) - 这将在重新启动时触发引导 . 或者您切换回默认 RocksDB 商店 .

    顺便说一句:如果您直接将某个主题作为表或全局表读取,Kafka Streams不会为容错创建额外的更改日志主题,但为此目的使用原始输入主题(这会降低Kafka群集中的存储要求) . 因此,那些输入主题应该启用日志压缩 .

相关问题