首页 文章

Kafka Streams使用依赖对象等待功能

提问于
浏览
0

我创建了一个Kafka Streams应用程序,它接收来自不同主题的不同JSON对象,我想实现某种等待功能,但我不确定如何最好地实现它 .

为了简化问题,我将在下一节中使用简化实体,我希望可以用它来描述问题 . 因此,在我的一个溪流中,我收到了汽车物品,每辆车都有一个身份证 . 在第二个流中,我接收人物对象,并且每个人还具有车辆ID并且被分配给具有该id的车辆 .

我想从两个输入流(主题)中读取我的Kafka Streams应用程序,并使用具有相同车辆ID的四个人来丰富汽车对象 . 只有当所有四个人都被包含在汽车对象中时,才应将汽车对象转发到下一个下游处理器 .

我计划为汽车创建一个输入流,为person对象创建一个输入流,将JSON数据解析为内部对象表示,将两个流合并在一起并在合并流上应用“selectKey”函数以从中提取密钥 . 实体 . 之后,我会将数据推送到自定义转换函数,该函数包含一个状态存储库 . 在这个转换函数中,我会将每个到达的汽车对象及其id存储在状态存储中 . 一旦新的人物物体到达,我会将它们添加到州商店中的相应汽车物体上(请忽略这里迟到的汽车的情况) . 只要四个人在汽车对象中,我就会将对象转发到下一个流函数并将汽车对象从状态存储中移除 .

这是一个合适的方法吗?我不确定可伸缩性,因为我必须确保在运行多个实例时,具有相同id的car和person对象将由同一个应用程序实例处理 . 我会使用selectKey函数,这会起作用吗?

谢谢!

1 回答

相关问题