首页 文章

合并多个相同的Kafka Streams主题

提问于
浏览
7

我有2个Kafka主题从不同来源流式传输完全相同的内容,因此我可以获得高可用性,以防其中一个来源失败 . 我正在尝试使用Kafka Streams 0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复 .

当使用KStream的 leftJoin 方法时,其中一个主题可以没有问题(次要主题),但是当主要主题发生故障时,不会向输出主题发送任何内容 . 这似乎是因为,根据Kafka Streams developer guide

KStream-KStream leftJoin始终由来自主流的记录驱动

因此,如果没有来自主流的记录,它将不会使用辅助流中的记录,即使它们存在 . 主流重新联机后,输出将恢复正常 .

我也尝试使用 outerJoin (添加重复记录),然后转换为KTable和groupByKey以消除重复,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

但我偶尔也会重复一遍 . 我也使用 commit.interval.ms=200 来让KTable经常发送到输出流 .

接近此合并以从多个相同的输入主题获得一次输出的最佳方法是什么?

1 回答

相关问题