我和我的 Kafka 制片人面临一个奇怪的问题 . 我使用kafka-0.11服务器/客户端版本 . 我有一个zookeper和一个kafka经纪人节点 . 另外,我创建了3个分区的“事件”主题:

Topic:events    PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: events   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: events   Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: events   Partition: 2    Leader: 0       Replicas: 0     Isr: 0

在我的java代码中,我使用以下属性创建 生产环境 者:

Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(MAX_BLOCK_MS_CONFIG, 30000);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(PARTITIONER_CLASS_CONFIG, KafkaCustomPartitioner.class);
this.producer = new KafkaProducer<>(props);

此外,我已经向Producer#send()方法添加了一个回调函数,该方法将失败的消息添加到由循环中的另一个“重新发送”线程迭代的队列中:

this.producer.send(producerRecord, new ProducerCallback(producerRecord.value(), topic));

private class ProducerCallback implements Callback {
  private final String message;
  private final String topic;

  public ProducerCallback(String message, String topic) {
    this.message = message;
    this.topic = topic;
  }

  @Override
  public void onCompletion(RecordMetadata metadata, Exception ex) {
    if (ex != null) {
        logger.error("Kafka producer error. Topic: " + topic +
                ".Message will be added into failed messages queue.", ex);
        failedMessagesQueue.enqueue(SerializationUtils.serialize(new FailedMessage(topic, message)));
    }
  }
}

private class ResenderThread extends Thread {
    private volatile boolean running = true;

    public void stopGracefully() {
        running = false;
    }

    @Override
    public void run() {
        while (running) {
            try {
                byte[] val = failedMessagesQueue.peek();
                if (val != null) {
                    FailedMessage failedMessage = SerializationUtils.deserialize(val);
                    ProducerRecord<String, String> record;
                    if (topic.equals(failedMessage.getTopic())) {
                        String messageKey = generateMessageKey(failedMessage.getMessage());
                        record = createProducerRecordWithKey(failedMessage.getMessage(), messageKey, failedMessage.getTopic());
                    } else {
                        record = new ProducerRecord<>(failedMessage.getTopic(), failedMessage.getMessage());
                    }
                    try {
                        this.producer.send(record).get();
                        failedMessagesQueue.dequeue();
                    } catch (Exception e) {
                        logger.debug("Kafka message resending attempt was failed. Topic " + failedMessage.getTopic() +
                                " Partition. " + record.partition() + ". " + e.getMessage());
                    }
                }

                Thread.sleep(200);
            } catch (Exception e) {
                logger.error("Error resending an event", e);
                break;
            }
        }
    }
}

一切正常,直到我决定测试Kafka经纪人杀/重启场景:

我杀了我的Kafka经纪人节点,并使用我的Kafka制作人发送了5条消息 . 我的制作人应用程序记录了以下消息:

....the application works fine....
// kafka broker was killed
2017-11-10 09:20:44,594 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,646 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,700 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,759 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,802 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
// sent 5 message using producer. message were put to the failedMessagesQueue and "re-sender" thread started resending 
2017-11-10 09:20:44,905 ERROR [com.inq.kafka.KafkaETLService] - <Kafka producer error. Topic: events.Message will be added into failed messages queue.>
....
2017-11-10 09:20:45,070 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,129 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,170 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,217 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>

// kafka broker was restarted, some strange errors were logged
2017-11-10 09:20:51,103 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 29 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,205 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 31 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,308 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 32 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,114 WARN [org.apache.kafka.clients.producer.internals.Sender] - <Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it>
2017-11-10 09:20:51,114 ERROR [com.inq.kafka.KafkaETLService] - <Kafka message resending attempt was failed. Topic events. org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.>
2017-11-10 09:20:52,485 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 33 : {events=INVALID_REPLICATION_FACTOR}>
// messages were succesfully re-sent and received by consumer..

如何摆脱这些日志(当Kafka经纪人关闭时每100ms记录一次):

[org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>

为什么在Kafka代理启动后我收到以下错误(我没有更改任何服务器道具,也没有改变主题) . 在我看来,这些错误是在代理启动期间zookeeper和kafka之间的某些同步过程的结果,因为经过一段时间后,procuder成功地重新发送了我的消息 . 我错了吗?:

[org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 29 : {events=INVALID_REPLICATION_FACTOR}>
Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it.