首页 文章

Kafka Java Consumer已经关闭

提问于
浏览
1

我刚开始使用Kafka . 我正面临着与消费者的小问题 . 我用Java编写了一个使用者 .

我得到此异常 - IllegalStateException此消费者已被关闭 .

我在以下行中得到例外:

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);

这种情况在我的消费者遇到一些异常后崩溃时开始发生,当我再次尝试运行它时,它给了我这个例外 .

这是完整的代码:

package StreamApplicationsTest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class StreamAppConsumer {

public static void main(String[] args){
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");

    Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    while(true){
        ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
        while(iterator.hasNext()){
            i++;
            ConsumerRecord<String,String> consumerRecord = iterator.next();
            String key = consumerRecord.key();
            String value = consumerRecord.value();
            if(key=="exit" || value=="exit")
                break;
            System.out.println("Key="+key+"\tValue="+value);
        }

        System.out.println("Messages processed = "+Integer.toString(i));
        consumer.close();

    }
}
}

我只是坚持这个问题,任何形式的帮助都会有用 .

2 回答

  • 0

    这种情况正在发生,因为您在无限循环结束时关闭了消费者,所以当它第二次轮询消费者已经关闭时 . 为了解决当前问题,我将整个 while(true) 循环包装在try-catch中,并在catch或finally块中处理消费者关闭 .

    但是,如果不同的关闭信号,请建议查看Confluent的示例,以获得正常的消费者关闭here . 在你的情况下,你're running in the main thread it'看起来像这样......

    public static void main(String[] args) {
        int i = 0;
        //List<String> topics = new ArrayList<>();
        List<String> topics = Collections.singletonList("test_topic");
        //topics.add("test_topic");
        Properties consumerConfigurations = new Properties();
        consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");
    
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
        consumer.subscribe(topics);
    
        Runtime.getRuntime().addShutdownHook(new Thread()
        {
          public void run() {
            consumer.wakeup();
          }
        });
    
        try {
          while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            while (iterator.hasNext()) {
              i++;
              ConsumerRecord<String, String> consumerRecord = iterator.next();
              String key = consumerRecord.key();
              String value = consumerRecord.value();
              if (key == "exit" || value == "exit")
                break;
              System.out.println("Key=" + key + "\tValue=" + value);
            }
            System.out.println("Messages processed = " + Integer.toString(i));
          }
        } catch (WakeupExection e) {
          // Do Nothing
        } finally {
          consumer.close();
        }
      }
    }
    

    基本上运行 consumer.wakeup() 是消费者中唯一的线程安全方法,所以它是's the only one than can be ran inside of Java'的关闭钩子 . 由于消费者在叫醒时没有睡着,所以它会使唤醒行为跳闸,从而优雅地关闭消费者 .

  • 4

    这似乎有效

    public static void main(String[] args) {
    
            List<String> topics = new ArrayList<>();
            topics.add("test.topic");
    
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP_TO_KAFKA_SERVER");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
    
            System.out.println("Polling");
            ConsumerRecords<String, String> consumerRecords = consumer.poll(5000);
    
            try {
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.println(record.offset() + ": " + record.value());
                }
            } finally {
                consumer.close();
            }
        }
    

    确保您的服务器或本地kafka可以访问

    产量

    --- exec-maven-plugin:1.2.1:exec (default-cli) @ MVN ---
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    Polling
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    6: test
    7: tes
    

相关问题