首页 文章

无法在Spring Boot中使用Kafka消息

提问于
浏览
0

我们有一个使用 org.apache.kafka.clients.consumer.KafkaConsumer 消费Kafka消息的Java应用程序

我们已经创建了一个具有Spring-Kafka依赖关系的Spring Boot应用程序,但是无法读取新项目中的消息 . 已经检查了明显的参数,包括引导服务器的主机名和端口(日志显示被识别),组,主题和Spring Boot,就像原始使用者一样,使用 StringDeserializer . 这是我们的配置文件:

spring:
  kafka:
    bootstrap-servers: hostname1:9092,hostname2:9092
    consumer:
      auto-offset-reset: earliest
      group-id: our_group
      enable-auto-commit: false
      fetch-max-wait: 500
      max-poll-records: 1

kafka:
  topic:
    boot: topic.name

和接收者:

@Component
public class Receiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "${kafka.topic.boot}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        latch.countDown();
    }

}

以下是启动Boot应用程序的代码:

@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
    }
}

正在捕获此异常:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

有什么明显我忽略了吗?调试这个的最佳方法是什么?

谢谢

2 回答

  • 1

    我希望你可能错过了需要在 @Configuration 文件中指定的 KafkaListenerContainerFactory bean

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(new HashMap<String,Object>((Map)consumerConfig)));
      factory.setConcurrency(concurrentConsumerCount);
      factory.setBatchListener(true);
      return factory;
    }
    

    也为消费者指定 KafkaListenerContainerFactory 之类的
    @KafkaListener(topics = ("${kafka.topic.boot}"), containerFactory = "kafkaManualAckListenerContainerFactory"

  • 0

    我也有这个问题,发生的事情是我无法连接到服务器 . 您可以在 application.propertiesapplication.yml 中更改日志级别以查看更多详细信息 . 恶魔在日志中..

    logging:
      level:
        root: WARN
        org.springframework: INFO
        org.apache.kafka: DEBUG
    

    我被告知Kafka无法处理名称查找,根据我的经验,连接主机几乎应该始终是FQDN名称(包含域名和所有名称) . 在我的情况下,我认为我没有在我的虚拟框中设置域,并且无法找到我的访客框,即使我们在同一个子网中并且 ping 有效 .

    另外,我为Kafka部分创建了另一个主要类,结果证明是错误的 . 这不是一个好习惯,您应该使用 @EnableKafka 注释应用程序主类,只需将设置放在yml文件中,就应该加载它们 . 不需要其他配置类 .

    我的消费者:

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class KafkaConsumer {
        @KafkaListener(topics={"testtopic"})
        public void listen(@Payload String message) {
            log.info("Received message is {}", message);
        }
    }
    

    我的应用程序:

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
    import org.springframework.kafka.annotation.EnableKafka;
    
    @Slf4j
    @SpringBootApplication(exclude = { SecurityAutoConfiguration.class })
    @EnableKafka    // <----- I only had to add this line
    public class SomeApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SomeApplication.class, args);
            log.info("Application launched. ");
        }
    }
    

    我的配置yml:

    logging:
      level:
        root: WARN
        org.springframework: INFO
        org.apache.kafka: DEBUG
    
    spring:
      kafka:
        bootstrap-servers: <FQDN names here:9092>
        consumer:
          group-id: <unique-group-id>
          enable-auto-commit: false # never ack messsage when it is received.
        listener:
          ack-mode: manual # I am responsible to ack the messages
    

    并启动该应用程序 . 就这样 .

相关问题