首页 文章

在Kafka Streams应用程序中更新本地状态存储库

提问于
浏览
0

我在Kafka中有一个日志压缩的主题,我正在KTable中阅读并创建一个商店 . 每当我更新键控消息时,我都没有看到应用程序运行时的最新状态 . 但是当我重新启动流应用程序时,我能够看到更新的状态 . 如何在没有重新启动的情况下运行流应用程序时获取更新状态 .

这是我用来迭代状态值的代码(在main函数中) .

final String queryableStore = metaTable.queryableStoreName();

    ReadOnlyKeyValueStore<String,String> metaTableView = null;
    try {
        metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));

1 回答

  • 3

    不确定如何运行代码 . 但如果我理解正确,你希望 logger.info 打印每次更新?如果是这种情况,那么这将不起作用,因为 .all() 遍历KTable一次并在之后终止 . 请注意,您在代码示例中使用的"Interactive Queries"是为一次性查找而设计的 . 您需要在自己的代码中重新运行查询 - 当然,您每次都会得到所有记录 .

    如果您想连续获取KTable更新(并且只有更新),您需要直接使用KTable . 例如,你可以这样做

    KTable table = builder.table(...);
    table.toStream().foreach(...);
    

    对KTable的每次更新都将生成由 foreach() 处理的输出记录 . 注意记录缓存;您可能希望禁用缓存以获取所有更新,否则,某些更新可能无法转发,因为它们受缓存限制 .

相关问题