我们有一个由6个节点组成的Kafka集群 . 6个节点中有5个拥有zookeeper .

火花流工作正在从流服务器读取,进行一些处理,并将结果发送到Kafka .

火花作业不时被卡住,没有数据被发送到Kafka,并且作业重新启动 .

作业一直停滞并重新启动,直到我们手动重启Kafka集群 . 重启Kafka后一切顺利 .

检查Kafka日志,我们发现此异常被抛出几次

2017-03-10 05:12:14,177 ERROR state.change.logger: Controller 133 epoch 616 initiated state change for partition [live_stream_2,52] from OfflinePartition to OnlinePartition failed
kafka.common.NoReplicaOnlineException: No broker in ISR for partition [gnip_live_stream_2,52] is alive. Live brokers are: [Set(133, 137, 134, 135, 143)], ISR brokers are: [142] 
    at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:66)
    at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
    at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
    at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:164)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:259)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
    at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

对于未使用的主题( live_stream_2 ),会引发上述异常,但对于使用过的主题也会引发异常 .

以下是使用主题的例外情况

2017-03-10 12:05:18,535 ERROR state.change.logger: Controller 133 epoch 620 initiated state change for partition [gnip_live_stream,3] from OfflinePartition to OnlinePartition failed
kafka.common.NoReplicaOnlineException: No broker in ISR for partition [live_stream,3] is alive. Live brokers are: [Set(133, 134, 135, 137)], ISR brokers are: [136] 
    at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:66)
    at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
    at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
    at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:164)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:259)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
    at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

在第一个例外中,它表示分区52的ISR代理列表仅包含ID为142的代理,这很奇怪,因为集群没有具有此id的代理 .

在第二个例外中,它表示分区3的ISR代理列表仅包含ID为136的代理,该代理不存在于代理实时列表中 .

我怀疑zookeeper中存在导致第一个异常的陈旧数据,并且由于某种原因,代理136在特定时间停止导致第二个异常 .

我的问题

1-这些例外可能是Kafka(以及因此火花作业)卡住的原因吗?

2-如何解决?