我遇到了 Kafka 消费者的问题 . 我使用新的Kafka和新的Consumer Java API . 来自quickstart的最简单的Kafka和Zookeeper .
我启动应用程序,并且在我的消费者消费了几次主题消息后,它停止接收 .
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class MyKC{
public MyKC(){
Properties config = new Properties();
config.put("zookeeper.connect", "localhost:2181");
config.put("group.id", "default");
config.put("bootstrap.servers", "localhost:9092");
config.put("enable.auto.commit", "true");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
TopicPartition tp = new TopicPartition("connect-test", 0);
List<TopicPartition> ltp = Arrays.asList(tp);
consumer.assign(ltp);
consumer.seekToEnd(ltp);
ConsumerRecords<String, String> records;
while(true){
records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
}
}
当我附加到我的代码计数消息数量:
while(true){
records = consumer.poll(1000);
System.out.print(records.count() + "; ");
}
我看到,在每次迭代时,消费者都不会获得帖子 . 它看起来像这样:
1; 1; 1; 0; 0; 30; 70; 1; 1; 21; 16; 2; 1; 1; 8; 49; 2; 1; 62; 35; 5; 11; 47; 2; 1; 1; 1; 1; 31; 1; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0;等等
可能需要很长时间(5分钟以上)或几秒钟,但我每次都会得到相同的结果 .
当我重新启动消费者时,历史重演 . 我确信这些消息继续以每秒100个的速度到达主题 .
有谁有想法吗?
更新
顺便说一句,如果我使用像描述on this page这样的高级消费者,这个消费者有像以前一样的问题,但是直到我没有重新启动kafka服务器它才从主题获取消息 .
如果我重新启动他而不是服务器,第一个消费者(简单)会继续传递消息 .
如果我使用subscribe()方法,我必须重新启动kafka服务器,如果我希望该消费者收到消息 . 如果我使用assign()方法,我必须只重新启动我的消费者,它会在一段时间内收到数据 .
更新2
关于此的更多数据 .
如果我设置这样的消费者配置: consumer.seekToBeginning(ltp);
我的消费者在没有任何问题的情况下始终收到消息,直到到达抵消结束 . 然后消费者更难接收消息,直到它停止 .
如果我在 consumer.seekToEnd(ltp);
中设置此设置,开始接收消息前几秒没有问题,然后逐渐停止 .
可能是与消息偏移相关的问题?
更新3
这是我对@fhussonnois发表评论的回答 .
对不起,但我的英语水平不会让我在飞行中阅读Javadoc . 如果我正确理解了这种方法的描述,那么poll(Long.MAX_VALUE)会让消费者等待29.2万年,而我们知道Kafka集群以每秒100的速度重新获取数据 .
现在我创建了我的脏修复程序,我在测试中启动了它 . 它看起来像这样:
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class MyKC{
private Properties config;
private KafkaConsumer<String, String> consumer;
private TopicPartition tp;
private List<TopicPartition> ltp;
private ConsumerRecords<String, String> records;
private long offset = 0;
public MyKC(){
config = new Properties();
config.put("zookeeper.connect", "localhost:2181");
config.put("group.id", "default");
config.put("bootstrap.servers", "localhost:9092");
config.put("enable.auto.commit", "true");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(config);
tp = new TopicPartition("connect-test", 0);
ltp = Arrays.asList(tp);
consumer.assign(ltp);
consumer.seekToEnd(ltp);
consume();
}
private void newConsumer(long offset){
consumer = new KafkaConsumer<String, String>(config);
consumer.assign(ltp);
consumer.seek(tp, offset);
consume();
}
private void restart(){
offset = consumer.endOffsets(ltp).get(tp);
consumer.close();
consumer = null;
newConsumer(offset);
}
public void consume(){
long time = System.currentTimeMillis();
while (true) {
records = consumer.poll(1000);
if (records.count() != 0){
time = System.currentTimeMillis();
for (ConsumerRecord<String, String> record : records){
System.out.println(record.value());
}
} else {
if ((System.currentTimeMillis() - time) >= 30000){
restart();
}
}
}
}
}
在这一刻,它工作了2个小时,并重新启动了14次 .
在我取消这个测试后,我会尝试你的方式 . :)