首页 文章

Kafka Stream:KTable实现

提问于
浏览
1

如何确定某个主题的KTable实现何时完成?

对于例如假设KTable有几百万行 . 下面的伪代码:

KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows

在某个时间点,我想安排一个线程调用以下内容,写入主题:kt.toStream() . to(“output_topic_name”);

我想确保所有数据都是作为上述调用的一部分编写的 . 此外,一旦调用了上述“to”方法,是否可以在下一个调度中调用它,或者第一个调用是否始终保持活动状态?

Follow-up Question:

约束
1)好的,我看到kafkastream启动后kstream和ktable是无界/无限的 . 但是,ktable实现(对于压缩的主题)不会在指定的时间段内为同一个密钥发送多个条目 .

因此,除非压缩过程尝试清除这些并且仅保留最新的压缩过程,否则下游应用程序将使用从主题查询相同密钥的所有可用条目,从而导致重复 . 即使压缩过程进行了某种程度的清理,也总是不可能在给定的时间点,当压缩过程正在赶上时,有一些键具有多个条目 .

我假设KTable在RocksDB中只有一条给定键的记录 . 如果我们有办法安排实现,那将有助于避免重复 . 此外,减少主题中持久存储的数据量(增加存储量),增加网络流量,压缩过程的额外开销以清理它 .

2)也许ReadOnlyKeyValueStore允许从商店进行受控检索,但它仍然缺乏安排检索密钥,值和写入主题的方法,这需要额外的编码 .

是否可以改进API以实现受控制的物化?

1 回答

  • 3

    KTable实现永远不会完成,你也不能"invoke" to() .

    当您使用Streams API时,您将“插入”DAG运算符 . 实际的方法调用,不会触发任何计算,而是修改运算符的DAG .

    只有在通过 KafkaStreams#start() 开始计算后才会处理数据 . 请注意,您指定的所有运算符将在计算开始后连续且并发运行 .

    没有"end of a computation"因为输入预期是无限/无限的,因为上游应用程序可以随时将新数据写入输入主题 . 因此,您的程序永远不会自行终止 . 如果需要,您可以通过 KafkaStreams#close() 停止计算 .

    在执行期间,您无法更改DAG . 如果要更改它,则需要停止计算并创建一个新的 KafkaStreams 实例,该实例将修改后的DAG作为输入

    Follow up:

    是 . 您必须将KTable视为"versioned table",当条目更新时,它会随着时间的推移而发展 . 因此,所有更新都将写入changelog主题,并作为更改记录发送到下游(请注意,KTables也会对同一个键进行一些缓存:cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html) .

    将使用从主题查询相同键的所有可用条目,从而导致重复 .

    我不认为那些是"duplicates"而是更新 . 是的,应用程序需要能够正确处理这些更新 .

    如果我们有办法安排实现,那将有助于避免重复 .

    实现是一个连续的过程,只要输入主题中有新的输入记录并进行处理,就会更新KTable . 因此,在任何时间点都可能存在特定密钥的更新 . 因此,即使您完全控制何时向changelog主题和/或下游发送更新,稍后可能会有新的更新 . 这是流处理的本质 .

    此外,减少主题中持久存储的数据量(增加存储空间),增加网络流量,以及压缩过程中的额外开销以进行清理 .

    如上所述,缓存用于节省资源 .

    可以改进API以实现受控制的物化吗?

    如果提供的KTable语义不符合您的要求,您始终可以将自定义运算符编写为 ProcessorTransformer ,将键值存储附加到其中,并实现您需要的任何内容 .

相关问题