首页 文章

spring-cloud-stream kafka错误处理

提问于
浏览
2

我查看了spring-cloud-stream 1.0.0.RELEASE的文档,但似乎找不到任何有关错误处理的文档 .

基于对kafka 0.9的观察,如果我的消费者抛出RuntimeException,我会看到3次重试 . 在3次重试之后,我在日志中看到了这一点:

2016-05-17 09:35:59.216 ERROR 8983 --- [  kafka-binder-] o.s.i.k.listener.LoggingErrorHandler     : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message

此时,消费者偏移量滞后1,如果我重新启动消费者,则会再次重试该消息3次 . 但是,如果我然后将另一条消息发送到同一分区,以便使用者不抛出异常,则消费者偏移量会更新,并且我们抛出异常的原始消息将不会在重新启动后再重试 .

这是在我找不到的地方记录的吗?错误处理特定于绑定程序,还是s-c-s抽象,以便在绑定程序之间保持一致?我怀疑这是如何使用kafka Binders 更新消费者抵消的计划外结果 . 我看到添加了一个enableDlq kafka消费者属性,我即将对此进行测试,但我不确定我们如何处理kafka中的死信 . 我熟悉rabbitmq中的死信队列,但是对于rabbitmq,我们可以使用rabbitmq铲插件来重新发布和重试dlq消息,以涵盖因故障导致临时服务中断的情况 . 我不知道kafka可以使用任何类似的功能,而不是自己编写类似的实用程序 .

更新:启用enableDlq kafka使用者属性的测试显示与错误处理相同的使用者偏移问题 . 当使用者抛出RuntimeException时,我看到3次重试,之后没有记录错误消息,我看到一条消息已发布到 error.<destination>.<group> ,但是消费者偏移量没有更新并且滞后1.如果我重新启动消费者,它尝试再次处理来自原始主题分区的相同失败消息,重试3次并再次将相同的消息放在 error.<destination>.<group> 主题上(重复dlq消息) . 如果我将另一条消息发布到消费者未抛出RuntimeException的同一主题分区,则会更新偏移量,并且在重新启动时不再重试原始失败消息 .

我认为消费者应该在消费者抛出错误时更新kafka中的消费者偏移量,无论enableDlq是否为真 . 这至少会使所有重试失败的消息被丢弃(当enableDlq为false时)或者发布到dlq并且从不重试(当enableDlq为true时) .

1 回答

  • 1

    对我来说看起来像一个错误 - 监听器容器有一个属性 autoCommitOnError (默认情况下为 false ),它不会被 Binders 暴露(或设置) . 在调用错误处理程序(发布错误)后,如果布尔值为true,则提交偏移量 .

    请在github中将其报告为问题 .

相关问题