首页 文章

正确管理Spring Cloud Stream Kafka中的DLQ

提问于
浏览
1

我想使用kafka在Spring Cloud Stream中管理DLQ .

application.yaml

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                binder:
                    brokers: IP:9092
                bindings:
                    greetings-in:
                        destination: greetings
                        contentType: application/json
                    greetings-out:
                        destination: greetings
                        contentType: application/json
            bindings:
                greetings-out:
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
    kafka:
      consumer:
        group-id: A

正如您在我的配置中所看到的,我启用dlq并为dlq主题设置名称 .

为了测试DLQ行为,我在某些消息上抛出异常

My listener component

@StreamListener("greetings-out")
    public void handleGreetingsInput(@Payload Greetings greetings) throws Exception {
        logger.info("Greetings input -> {}", greetings);
        if (greetings.getMessage().equals("ciao")) {
            throw new Exception("eer");
        }
    }

这样,等于“ciao”的消息抛出异常,在日志中我看到它被处理三次

2018-07-09 13:19:57.256  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@3da9d701[timestamp=0,message=ciao]
2018-07-09 13:19:58.259  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@5bd62aaf[timestamp=0,message=ciao]
2018-07-09 13:20:00.262  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@c26f92b[timestamp=0,message=ciao]
2018-07-09 13:20:00.266 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.mitro.service.GreetingsListener#handleGreetingsInput[1 args]; nested exception is java.lang.Exception: eer, failedMessage=GenericMessage [payload=byte[32], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@510302cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=greetings-out, kafka_receivedTimestamp=1531142397248}]

这对我来说很好,但我不明白为什么会出现一个名为dead-out的主题(请看下面的图片) .
topics' view

What am I doing wrong?

EDIT 1: (still doesn't create topic for DLQ)

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                streams:
                    binder:
                        serdeError: sendToDlq
                binder:
                    brokers: IP:9092
                    auto-create-topics: true
                bindings:
                    greetings-out:
                        destination: greetings-out
                        contentType: application/json
                        consumer:
                          enableDql: true
                          dlqName: dead-out
                          autoCommitOnError: true
                          autoCommitOffset: true
            bindings:
                greetings-out:
                    destination: greetings-out
                    contentType: application/json
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
                        autoCommitOnError: true
                        autoCommitOffset: true
    kafka:
      consumer:
        group-id: A

enter image description here

1 回答

  • 1

    看起来你的属性是颠倒的;公共属性 - destination,contentType - 必须在 spring.cloud.stream.bindings 之下 . 特定于kafka的属性(enableDlq,dlqName)必须位于 spring.clound.stream.kafka.bindings 之下 .

    你让他们逆转了 .

    EDIT

    您的(修改过的)配置有两个问题 .

    • 拼写错误 enableDql 而不是 enableDlq

    • 没有组 - 您不能拥有匿名消费者的DLQ:

    引起:java.lang.IllegalArgumentException:DLQ支持不适用于匿名订阅

    这很好用:

    spring:
      application:
        name: employee-consumer
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
              auto-create-topics: true
            bindings:
              input:
                consumer:
                  enableDlq: true
                  dlqName: dead-out
                  autoCommitOnError: true
                  autoCommitOffset: true
          bindings:
            input:
              group: so51247113
              destination: greetings-out
              contentType: application/json
    

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So51247113Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So51247113Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void in(String in) {
            System.out.println(in);
            throw new RuntimeException("fail");
        }
    
        @KafkaListener(id = "foo", topics = "dead-out")
        public void dlq(Message<?> in) {
            System.out.println("DLQ:" + in);
        }
    
    }
    

相关问题