用于测试消费者故障检测的简单kafka客户端不提供预期的行为 . 我一定错过了什么 .

使用kafka版本0.10.1.0和使用java kafka-client 0.10.1.0的使用者进行测试 .

下课是平行两次午餐 . 正如所料,一个客户正在消费该组中的主题 . 但是如果使用kill -9杀死活跃的消费者,则该组不会重新 balancer 到其他消费者 .

public class BasicConsumer {

    public BasicConsumer() {
        // set up the consumer
        Properties props = new Properties();
        props.put("bootstrap.servers",       "localhost:9092");
        props.put("group.id",                "test");
        props.put("enable.auto.commit",      "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms",      "30000"); // half a minute timeout
        props.put("max.poll.records",        "10");
        props.put("key.deserializer",        "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",      "org.apache.kafka.common.serialization.StringDeserializer");
        System.out.printf("Starting Consumer %n");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test1"));

        LocalDateTime inFewMinutes = LocalDateTime.now().plusMinutes(10);
        try {
            while (LocalDateTime.now().isBefore(inFewMinutes)) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                System.out.printf("%s Poll returned %d records%n", LocalDateTime.now(), records.count());
                for (ConsumerRecord<String, String> record : records) {
                    Map    message = new Gson().fromJson(record.value(), Map.class);
                    Map    data    = (Map<String, String>) message.get("data");
                    String msgId   = (String)              data.get("TRANSFER_ID");

                    System.out.printf("%s Handling record id %s with offset %s%n", LocalDateTime.now(), msgId, record.offset());
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            consumer.close();
            System.out.printf("Consumer closed cleanly...%n");
        }
    }
}

kafka和zookeeper服务器是简单的安装,无需对配置进行任何修改 .

提前感谢任何想法 .

late edition :问题已解决

为了杀死消费者,我停止了用于启动java客户端的 gradle run 命令 . 这实际上并没有阻止java进程......

正确杀死java进程表明,在日志中,被杀死的活动消费者与使用kafka进行的重新 balancer 之间会出现30秒的延迟,以便将手提供给第二个消费者 . 正如session.timeout.ms参数所预期的那样 .