首页 文章

Spring 天 Kafka 和 Kafka 群集

提问于
浏览
0

我在群集中配置了3个kafka,我正在尝试使用spring-kafka .

但在我杀死kafka领导后,我无法将其他消息发送到队列中 .

我将spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092; kafka-2:9093,kafka-3:9094”以及我的hosts文件中的所有名称 .

Kafka 版0.10

有人知道如何正确配置?

Edit

我测试了一件事并发生了一种奇怪的行为 . 当我启动服务时,我向主题发送消息(强制创建)

码:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

所以,在这个时候我没有启动kafka-1服务器(只是其他服务器)并且它发生了异常:

org.springframework.kafka.core.KafkaProducerException:发送失败;嵌套异常是org.apache.kafka.common.errors.TimeoutException:60000毫秒后无法更新元数据 .

看起来spring-kafka只是尝试连接第一个bootstrap服务器 . 我正在使用spring-kafka 1.3.5.RELEASE和kafka 0.10.1.1

Edit 2

我做过你做过的测试 . 当我删除领导者已经改变的第一个泊坞容器(kafka-1)时,它会发生相同的情况 . 所以,我的消费者( Spring 季服务)无法使用消息 . 但是,当我再次启动kafka-1时,服务获取所有消息我的消费者ConcurrentKafkaListenerContainerFactory:

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}

1 回答

  • 2

    您需要在服务器地址之间使用逗号,而不是分号 .

    EDIT

    我刚刚运行了一个没有问题的测试:

    spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
    

    @SpringBootApplication
    public class So50804678Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50804678Application.class, args);
        }
    
        @KafkaListener(id = "foo", topics = "so50804678")
        public void in(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so50804678", 1, (short) 3);
        }
    
    }
    

    $ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
    Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    

    杀死了领导者,并且

    $ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
    Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2
    

    $ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678
    

    发送了一条消息,应用程序收到了该消息;除WARN外,日志中没有错误:

    [Consumer clientId = consumer-1,groupId = foo]无法 Build 与节点0的连接 . 经纪人可能无法使用 .

    然后我重新启动了死服务器;停止了我的应用;然后添加了这段代码......

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            while(true) {
                System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
                Thread.sleep(3_000);
            }
        };
    }
    

    再一次,杀死现任领导人没有任何影响;一切恢复好了 .

    您可能需要调整服务器道具中的listeners / advertised.listeners属性 . 由于我的经纪人都在本地主机上,因此我将其保留为默认值 .

相关问题