我们有一个使用 org.apache.kafka.clients.consumer.KafkaConsumer
消费Kafka消息的Java应用程序
我们已经创建了一个具有Spring-Kafka依赖关系的Spring Boot应用程序,但是无法读取新项目中的消息 . 已经检查了明显的参数,包括引导服务器的主机名和端口(日志显示被识别),组,主题和Spring Boot,就像原始使用者一样,使用 StringDeserializer
. 这是我们的配置文件:
spring:
kafka:
bootstrap-servers: hostname1:9092,hostname2:9092
consumer:
auto-offset-reset: earliest
group-id: our_group
enable-auto-commit: false
fetch-max-wait: 500
max-poll-records: 1
kafka:
topic:
boot: topic.name
和接收者:
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.boot}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
latch.countDown();
}
}
以下是启动Boot应用程序的代码:
@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
}
}
正在捕获此异常:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
有什么明显我忽略了吗?调试这个的最佳方法是什么?
谢谢
2 回答
我希望你可能错过了需要在
@Configuration
文件中指定的KafkaListenerContainerFactory
bean也为消费者指定
KafkaListenerContainerFactory
之类的@KafkaListener(topics = ("${kafka.topic.boot}"), containerFactory = "kafkaManualAckListenerContainerFactory"
我也有这个问题,发生的事情是我无法连接到服务器 . 您可以在
application.properties
或application.yml
中更改日志级别以查看更多详细信息 . 恶魔在日志中..我被告知Kafka无法处理名称查找,根据我的经验,连接主机几乎应该始终是FQDN名称(包含域名和所有名称) . 在我的情况下,我认为我没有在我的虚拟框中设置域,并且无法找到我的访客框,即使我们在同一个子网中并且
ping
有效 .另外,我为Kafka部分创建了另一个主要类,结果证明是错误的 . 这不是一个好习惯,您应该使用
@EnableKafka
注释应用程序主类,只需将设置放在yml文件中,就应该加载它们 . 不需要其他配置类 .我的消费者:
我的应用程序:
我的配置yml:
并启动该应用程序 . 就这样 .