我正在使用Kafka Streams API(KTable,GlobalKTable ..) . 我正在使用KStreams消费Kafka主题 . 我需要根据某些配置过滤掉一些传入的Kafka事件,并在配置更改后再处理它们 . 主题的持久性限制至少为7天 . 以下是要求:
关键 Value 状态
K1 V1已处理
K2 V2未处理(基于某些业务逻辑)
K3 V3已处理
K4 V4已处理
K1 V5已处理------>当前偏移量<--------
现在我想再次处理消息(K2,V2) . 我试图利用Ktables . 但是,不能成功 . 既然,我对这个概念比较陌生,不确定KStream,KTable就能满足这个要求 .
1 回答
看起来你遇到了一个问题,其中一些消息在第一次遇到时是不可处理的,并且你想回来并在以后的某个时间处理它们 .
脑海中浮现的唯一解决方案是将此类消息转发到另一个主题以供稍后处理(此处
branch
函数可能有用),从而允许主流的处理以线性方式继续 .您需要使用自定义处理器来处理延迟主题,该主题可以选择休眠一段时间,或使用其他逻辑来确定何时处理消息 .
但是,这种方法可能仅适用于未处理的消息后来可以按照它们首次遇到的相同顺序处理的情况 . 如果它们不存在,则可能会遇到可处理消息位于延迟队列中的不可处理消息后面的问题 . 您可能能够解决此问题,超时之后,仍会将无法处理的消息发回到主题的末尾 . 但这一切都取决于你的用例 .