我刚开始使用Kafka . 我正面临着与消费者的小问题 . 我用Java编写了一个使用者 .
我得到此异常 - IllegalStateException此消费者已被关闭 .
我在以下行中得到例外:
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
这种情况在我的消费者遇到一些异常后崩溃时开始发生,当我再次尝试运行它时,它给了我这个例外 .
这是完整的代码:
package StreamApplicationsTest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class StreamAppConsumer {
public static void main(String[] args){
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");
Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
while(iterator.hasNext()){
i++;
ConsumerRecord<String,String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if(key=="exit" || value=="exit")
break;
System.out.println("Key="+key+"\tValue="+value);
}
System.out.println("Messages processed = "+Integer.toString(i));
consumer.close();
}
}
}
我只是坚持这个问题,任何形式的帮助都会有用 .
2 回答
这种情况正在发生,因为您在无限循环结束时关闭了消费者,所以当它第二次轮询消费者已经关闭时 . 为了解决当前问题,我将整个
while(true)
循环包装在try-catch中,并在catch或finally块中处理消费者关闭 .但是,如果不同的关闭信号,请建议查看Confluent的示例,以获得正常的消费者关闭here . 在你的情况下,你're running in the main thread it'看起来像这样......
基本上运行
consumer.wakeup()
是消费者中唯一的线程安全方法,所以它是's the only one than can be ran inside of Java'的关闭钩子 . 由于消费者在叫醒时没有睡着,所以它会使唤醒行为跳闸,从而优雅地关闭消费者 .这似乎有效
确保您的服务器或本地kafka可以访问
产量