我们在2节点集群中运行Kafka(vesion kafka_2.11-0.10.1.0) . 我们有2个 生产环境 者(Java API)代理不同的主题 . 每个主题都有单个分区 . 我们遇到此问题的主题是有一个消费者正在运行 . 这个设置运行良好3个月,我们看到了这个问题 . 其他论坛中针对此问题的所有建议案例/解决方案似乎都不适用于我的方案 .

生产环境 者的例外情况;

-2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR client.producer.BingLogProducerCallback - 发送消息时遇到异常; > org.apache.kafka.common.errors.NotLeaderForPartitionException:此服务器不是该主题分区的领导者 .

我们没有启用消息的重试,因为这是事务数据,我们希望维护订单 .

制片人配置:

bootstrap.servers : server1ip:9092
acks :all
retries : 0
linger.ms :0
buffer.memory :10240000
max.request.size :1024000
key.serializer : org.apache.kafka.common.serialization.StringSerializer
value.serializer : org.apache.kafka.common.serialization.StringSerializer

我们在 生产环境 者和消费者处都连接到server1 . server2上的控制器日志表明在同一时间发生了一些关机,但我不明白为什么会这样 .

[2017-11-25 17:31:44,776] DEBUG [控制器2]:不在首选副本中的主题Map()(kafka.controller.KafkaController)[2017-11-25 17:31:44,776] TRACE [控制器2 ]:经纪人2的领导者不 balancer 比率是0.000000(kafka.controller.KafkaController)[2017-11-25 17:31:44,776] DEBUG [控制器2]:主题不在首选副本Map()(kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE [控制器2]:经纪人1的领导者不 balancer 比率为0.000000(kafka.controller.KafkaController)[2017-11-25 17:34:18,314] INFO [SessionExpirationListener on 2 ],ZK已过期;关闭所有控制器组件并尝试重新选择(kafka.controller.KafkaController $ SessionExpirationListener)[2017-11-25 17:34:18,317] DEBUG [控制器2]:控制器辞职,经纪人ID 2(kafka.controller.KafkaController) )[2017-11-25 17:34:18,317] DEBUG [Controller 2]:取消注册IsrChangeNotificationListener(kafka.controller.KafkaController)[2017-11-25 17:34:18,317] INFO [delete-topics-thread- 2],关闭(kafka.controller.TopicDeletionManager $ DeleteTopicsThread)[2017-11-25 17:34:18,317] INFO [delete-topics-thread-2],Stopped(kafka.controller.TopicDeletionManager $ DeleteTopicsThread)[2017- 11-25 17:34:18,318] INFO [delete-topics-thread-2],关闭完成(kafka.controller.TopicDeletionManager $ DeleteTopicsThread)[2017-11-25 17:34:18,318] INFO [控制器上的分区状态机2]:停止分区状态机(kafka.controller.PartitionStateMachine)[2017-11-25 17:34:18,318] INFO [控制器2上的副本状态机]:停止的副本状态机(kafka.controller.ReplicaStateMachine)[2017-11-25 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread],关闭(kafka.controller.RequestSendThread)[2017-11- 25 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread],已停止(kafka.controller.RequestSendThread)[2017-11-25 17:34:18,319] INFO [Controller-2-关闭-kerve-2-send-thread],关闭完成(kafka.controller.RequestSendThread)[2017-11-25 17:34:18,319] INFO [Controller-2-to-broker-1-send-thread],关闭down(kafka.controller.RequestSendThread)[2017-11-25 17:34:18,319] INFO [Controller-2-to-broker-1-send-thread],已停止(kafka.controller.RequestSendThread)[2017-11- 25 17:34:18,319] INFO [Controller-2-to-broker-1-send-thread],Shutdown completed(kafka.controller.RequestSendThread)[2017-11-25 17:34:18,319] INFO [Controller 2] :Broker 2辞职作为控制器(kafka.controller.KafkaController)[2017-11-25 17:34:18,353] DEBUG [IsrChangeNotificationListener]被解雇!!! (kafka.controller.IsrChangeNotificationListener)[2017-11-25 17:34:18,353] DEBUG [IsrChangeNotificationListener]被解雇!!! (kafka.controller.IsrChangeNotificationListener)[2017-11-25 17:34:18,354] INFO [Controller 2上的BrokerChangeListener]:针对带子子1,2的路径/经纪人/ ids触发的代理更改侦听器(kafka.controller.ReplicaStateMachine $ BrokerChangeListener )[2017-11-25 17:34:18,355] DEBUG [DeleteTopicsListener on 2]:删除为要删除的主题触发的主题监听器(kafka.controller.PartitionStateMachine $ DeleteTopicsListener)[2017-11-25 17:34:18,362] INFO [AddPartitionsListener on 2]:分区修改触发{“version”:1,“partitions”:{“0”:[1]}}用于路径/代理/主题/ ESQ(kafka.controller.PartitionStateMachine $ PartitionModificationsListener)[2017-11-25 17:34:18,368] INFO [AddPartitionsListener on 2]:触发分区修改{“version”:1,“partitions”:{“0”:[1]}}用于路径/代理/主题/ Test1(kafka.controller.PartitionStateMachine $ PartitionModificationsListener)[2017-11-25 17:34:18,369] INFO [AddPartitionsListener on 2]:触发分区修改{“version”:1,“partitions”:{“0”:[2 ]}} for path / brokers / topics / ImageQ(kafka.controller.PartitionStateMachine $ PartitionModificationsListener)[2017-11-25 17:34:18,374] INFO [AddPartitionsListener on 2]:触发分区修改{“version”:1,“分区 “:{” 8 “:[1,2],” 4 “:[1,2],” 9 “:[2,1],” 5 “:[2,1],” 6" :[1 ,2], “1”:[2,1], “0”:[1,2], “2”:[1,2], “7”:[2,1], “3”:[2 ,1]}} for path / brokers / topics / NMS_NotifyQ(kafka.controller.PartitionStateMachine $ PartitionModificationsListener)[2017-11-25 17:34:18,375] INFO [AddPartitionsListener on 2]:触发分区修改{“version”:1 ,“partitions”:{“0”:[1]}}用于路径/经纪人/主题/ TempBinLogReqQ @