我有2个主题,订单和 生产环境 ,我需要通过时间窗口加入: Join A and B if: Order.Timestamp < Production.Timestamp < Order.Timestamp + 5 Minutes
基本上:如果订单到货,接下来5分钟内的每个 生产环境 条目都对应于此订单,应该加入 .
每个时间戳都在有效负载中,每X个小时都会在批处理中填充这两个主题 .
部分解决方案:
我为两个流创建临时流A1,B1,并为每条记录生成带有更改时间戳的消息: prod.send(new ProducerRecord<>(topic, null, milliTimestampFromPayload, key, record));
现在两个流都是正确的时间编码,但时间戳是过去的 .
我可以通过JoinWindow加入他们: streamA.join(streamB, (recordA, recordB) -> recordB.Id() + ":" + recordA.Id(), JoinWindows.of(TimeUnit.MINUTES.toMillis(5)), Joined.with(...));
问题:
-
如果批处理的时间戳不同步,则不起作用
-
时间戳已过去并手动设置?这是正确的方法吗?
-
JoinWindows在语义上并不完全正确 .