首页 文章

Kafka Stream 和 Kafka 表一对多的关系加入

提问于
浏览
3

我有一个 kafka 流 - 比如博客和 kafka 表 - 说与这些博客相关的评论。来自 kafka 流的密钥可以映射到 Kafka 表 i.e 中的多个值。一个博客可以有多个评论。我想要连接这两个并使用注释 id 数组创建一个新对象。但是当我进行连接时,流只包含最后一个注释 ID。是否有任何文档或示例代码可以指出我正确的方向如何实现这一目标?基本上,是否有任何文档阐述如何使用 Kafka 流和 Kafka 表进行一对多关系连接?

KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
              (blogId, blog) -> blog.getBlogId(),
              (blog, comment) -> new EnrichedBlog(blog, comment));

所以不是评论 - 我需要有一系列评论 ID。

2 回答

  • 4

    我找不到在您的代码示例中使用签名匹配的连接方法,但这是我认为的问题:

    KTable 被解释为 changlog,也就是说,具有相同键的每个下一个消息都被解释为对记录的更新,而不是新记录。这就是为什么您只看到给定键(博客 ID)的最后一条“评论”消息,之前的值被覆盖。要解决这个问题,您首先需要更改填充 KTable 的方式。您可以做的是将您的评论主题作为 KStream 添加到拓扑中,然后执行聚合,该聚合只是构建一个数组或共享相同博客 ID 的注释列表。该聚合返回一个 KTable,您可以将其加入您的博客 KStream。

    这是一个如何构建 List-valued KTable 的草图:

    builder.stream("yourCommentTopic") // where key is blog id
    .groupByKey()
    .aggregate(() -> new ArrayList(), 
        (key, value, agg) -> new KeyValue<>(key, agg.add(value)),
        yourListSerde);
    

    列表在聚合中比在数组中更容易使用,因此我建议您在需要时将其转换为下游数组。您还需要为列表提供 serde 实现,在上面的示例中为“yourListSerde”。

  • 2

    如果您使用带有架构注册表的 avro,则应编写自己的聚合器,因为 kafka 流无法序列化 ArrayList。

    val kTable = aStream
            .groupByKey()
            .aggregate(
                    {
                        YourAggregator() // initialize aggregator
                    },
                    { _, value, agg ->
                        agg.add(value) // add value to a list in YourAggregator
                        agg
                    }
            )
    

    然后将kTable与您的其他流(bStream)一起加入。

    bStream
            .join(
                    kTable,
                    { b, a ->
                        // do your value join from a to b
                        b
                    }
            )
    

    对不起,我的片段是用 Kotlin 写的。

相关问题