我有2个主题,订单和 生产环境 ,我需要通过时间窗口加入: Join A and B if: Order.Timestamp < Production.Timestamp < Order.Timestamp + 5 Minutes 基本上:如果订单到货,接下来5分钟内的每个 生产环境 条目都对应于此订单,应该加入 .

每个时间戳都在有效负载中,每X个小时都会在批处理中填充这两个主题 .

部分解决方案:

我为两个流创建临时流A1,B1,并为每条记录生成带有更改时间戳的消息: prod.send(new ProducerRecord<>(topic, null, milliTimestampFromPayload, key, record));

现在两个流都是正确的时间编码,但时间戳是过去的 .

我可以通过JoinWindow加入他们: streamA.join(streamB, (recordA, recordB) -> recordB.Id() + ":" + recordA.Id(), JoinWindows.of(TimeUnit.MINUTES.toMillis(5)), Joined.with(...));

问题:

  • 如果批处理的时间戳不同步,则不起作用

  • 时间戳已过去并手动设置?这是正确的方法吗?

  • JoinWindows在语义上并不完全正确 .