我试图在反序列化时使用LogAndContinueExceptionHandler . 通过成功记录并继续发生错误,它可以正常工作 . 但是,假设我的传入消息有连续的错误流,我停止并重新启动kafka流应用程序,然后我看到失败并且已经记录在我上次尝试中的消息再次重新出现(它们正在记录再次) . 如果我尝试将错误的消息发送到DLQ,则会出现问题 . 重新启动时,它们会再次发送到DLQ . 一旦我有一个好的记录进入,看起来偏移进一步移动,并没有在另一次重启时再次看到已记录的消息 . 有没有办法在流应用程序中手动提交?我尝试使用ProcessorContext#commit(),但这似乎没有任何效果 .

我通过运行此处提供的示例重现了此行为:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java

我将传入值Serde更改为 Serdes.Integer().getClass().getName() 以强制输入反序列化错误并将提交间隔减少到仅1秒 . 还在配置中添加了以下内容 .

streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); .

一旦失败并且当我重新启动应用程序时,相同的记录在再次出现在日志之前失败 . 例如,每次重新启动应用程序时,我都会在控制台上看到以下输出 . 我希望这些不再尝试,因为我们之前已经跳过它们 .

2018-01-27 15:24:37,591 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 113 org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4 2018-01-27 15:24:37,592 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 114 org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

看起来当反序列化异常发生时,此标志在此处永远不会设置为true:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228 . 似乎只有处理成功才会成为现实 . 这可能就是为什么即使在我手动调用processorContext#commit()之后也没有发生提交的原因 .

感谢对这个主人的任何帮助 .

谢谢 .