我是一名研究和玩 Kafka 的学生 . 在关注Apache文档的示例之后,我正在使用当前Github仓库的主干中的示例部分 .
截至目前,该示例实现了 Consumer
的'older'版本,并且未使用新的 KafkaConsumer
. 在文档之后,我编写了自己的 KafkaConsumer
版本,认为它会更快 .
这是一个模糊的问题,但是在runthrough上我生成5000个简单的消息,如"Message_CurrentMessageNumber"到主题"test",然后使用我的消费者获取这些消息并将它们打印到 stdout
. 当我运行示例代码用更新的 KafkaConsumer
(v 0.8.2及更高版本)替换提供的消费者时,它的工作速度非常快,与第一个runthrough中的示例相当,但在此之后的任何时候都会大幅减速 .
我注意到我的 Kafka Server
输出
重新 balancer 组group1第3代(kafka.coordinator.ConsumerCoordinator)
或类似的消息经常让我相信Kafka必须做某种负载 balancer ,这减慢了东西,但我想知道是否有其他人有洞察我做错了什么 .
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "newestGroup");
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
// ConsumerRecords<Integer, String> records = consumer.poll(0);
// for (ConsumerRecord<Integer, String> record : records) {
// System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
// }
// consumer.close();
}
}
开始:
package kafka.examples;
public class KafkaConsumerProducerDemo implements KafkaProperties
{
public static void main(String[] args) {
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
Producer producerThread = new Producer("test", isAsync);
producerThread.start();
AlternateConsumer consumerThread = new AlternateConsumer("test");
consumerThread.start();
}
}
生产环境 者是这里的默认 生产环境 者:https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java
1 回答
情况并非如此 . 如果您的两个消费者之间的设置相似,那么除非客户端/消费者实现中存在问题,否则您应该期望新消费者获得更好的结果,这似乎就是这种情况 .
您是否可以分享您的基准测试结果和报告的重新 balancer 和/或任何模式的频率(例如,在启动后,在固定消息消耗之后,队列耗尽之后等等),您正在观察 . 此外,如果您可以分享有关您的客户实施的一些细节 .