我们有以下问题:
我们想听某些Kafka主题并构建它的“历史” - 所以对于指定的密钥提取一些数据,将其添加到该密钥的现有列表中(或者如果它不存在则创建一个新的)将它放到另一个主题,它只有一个分区,并且是高度压缩的 . 另一个应用程序可以只听取该主题并更新它的历史列表 .
我在想它如何适应Kafka流库 . 我们当然可以使用聚合:
msgReceived.map((key, word) -> new KeyValue<>(key, word))
.groupBy((k,v) -> k, stringSerde, stringSerde)
.aggregate(String::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
stringSerde, "summaries2")
.to(stringSerde, stringSerde, "transaction-summary50");
它创建了一个由Kafka支持的本地存储,并将其用作历史表 .
我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的实例将创建一个新的支持主题 ${applicationId}-${storeName}-changelog
(我假设每个应用程序都有不同的 applicationId
) . 每个实例开始使用输入主题,获取不同的键集并构建状态的不同子集 . 如果Kafka决定重新 balancer ,一些实例将开始错过本地商店中的一些历史状态,因为它们会获得一组全新的分区 .
问题是,如果我只是为每个正在运行的实例设置相同的applicationId,它是否应该最终重放来自同一个kafka主题的所有数据,每个运行的实例具有相同的本地状态?
2 回答
为什么要创建具有不同ID的多个应用程序来执行相同的工作? Kafka实现并行的方式是通过任务:
如果您需要扩展应用程序,则可以启动运行相同应用程序的新实例(相同的应用程序ID),并且一些已分配的任务将重新分配给新实例 . 本地状态存储的迁移将由库自动处理:
我建议你看看this guide .
一些假设不正确:
因此,如果所有实例都使用相同的应用程序ID,则所有正在运行的应用程序实例将使用相同的更改日志主题名称,因此,您打算执行的操作应该是开箱即用的 .