首页 文章

Kafka Streams - 在同一主题上获得KTable和KStream的最佳方式?

提问于
浏览
3

我对Kafka Streams有一个问题(0.10.1.1) . 我正在尝试在同一主题上创建 KStreamKTable .

我尝试的第一种方法是简单地在同一主题上调用流和表的 KStreamBuilder 方法 . 这导致了

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.

好吧,这似乎是Kafka Streams内置的一些限制 .

我的第二种方法是最初创建 KTable 并使用 toStream() 方法 . 这有一个问题, KTables 做了一些内部缓冲/刷新,因此如果一个键出现多次,输出流不会反映所有输入元素,如我的例子所示 . 这是一个问题,因为我正在计算密钥的出现次数 .

似乎有效的方法是最初创建一个 KStream ,按键对其进行分组,然后通过丢弃旧聚合并仅保留新值来对其进行分组 . 我对这种方法不太满意,因为a)看起来非常复杂,而且b) Reducer 接口没有指定哪一个是已经聚合的值,哪一个是新的 . 我参加了 Session 并保留了第二个,但是......嗯 .

所以问题归结为:有更好的方法吗?我错过了一些非常明显的东西吗?

请记住,我没有处理正确的用例 - 这只是我了解Streams-API .

1 回答

  • 3

    关于两次添加主题:这是不可能的,因为Kafka Streams应用程序是单个“消费者组”,因此只能一次为主题提交偏移,而添加主题两次则表明主题获取消费者两次(和独立进步) .

    对于方法 KTable#toStream() ,您可以通过 StreamsConfig 参数 cache.max.bytes.buffering == 0 禁用缓存 . 但是,这是一个全局设置,并禁用所有 KTable 的缓存/重复数据删除(参见http://docs.confluent.io/current/streams/developer-guide.html#memory-management) .

    更新:从Kafka 0.11开始,可以通过Materialized参数单独禁用每个KTable的缓存 .

    groupBy 方法也可以工作,即使它需要一些样板 . 我们考虑将 KStream#toTable() 添加到API以简化此转换 . 是的, reduce 中的第二个参数是新值 - 因为reduce是用于组合两个值,API没有"old"和"new"的概念,因此参数没有这样的命名 .

相关问题