首页 文章

KAFKA Java消费者无法工作

提问于
浏览
0

我无法让我的java消费者在本地主机上工作 . 控制台用户工作正常 . 以下是我的消费者代码 .

public class TestConsumer {public static void main(String [] args)throws Exception {

//Kafka consumer configuration settings
  String topicName = "test";// args[0].toString();
  Properties props = new Properties();

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

  KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));

  //print the topic name
  System.out.println("Subscribed to topic " + topicName);
  int i = 0;

  while (true) {
      System.out.printf("while loop");
      ConsumerRecords<String, String> records  = consumer.poll(1000);

      for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records.
          System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
  }

}}

Java 生产环境 者在同一主题上工作得很好 .

public class TestProducer {public static void main(String [] args)throws Exception {

//Assign topicName to string variable
      String topicName = "test";//args[0].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

      props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println("Message sent successfully");
               producer.close();
        }

}

1 回答

  • 0

    我有另一个消费者实现,并已开始工作 . 我猜是我放的属性不正确 . 公共类KafkaConsumer {private ConsumerConnector consumerConnector = null; private final String topic =“test”;

    public void initialize() {
          Properties props = new Properties();
          props.put("zookeeper.connect", "localhost:2181");
          props.put("group.id", "testgroup");
          props.put("zookeeper.session.timeout.ms", "400");
          props.put("zookeeper.sync.time.ms", "300");
          props.put("auto.commit.interval.ms", "1000");
          ConsumerConfig conConfig = new ConsumerConfig(props);
          consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
    }
    
    public void consume() {
          //Key = topic name, Value = No. of threads for topic
          Map<String, Integer> topicCount = new HashMap<String, Integer>();       
          topicCount.put(topic, new Integer(1));
    
          //ConsumerConnector creates the message stream for each topic
          Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =   consumerConnector.createMessageStreams(topicCount);         
    
          // Get Kafka stream for topic 'zinguplife'
          List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);
          // Iterate stream using ConsumerIterator
          for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                 ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
    
                 while (consumerIte.hasNext())
                        System.out.println("Message consumed from topic    [" + topic + "] : "  + new String(consumerIte.next().message()));              
          }
          //Shutdown the consumer connector
          if (consumerConnector != null)   consumerConnector.shutdown();          
    }
    
    public static void main(String[] args) throws InterruptedException {
          KafkaConsumer kafkaConsumer = new KafkaConsumer();
          // Configure Kafka consumer
          kafkaConsumer.initialize();
          // Start consumption
          kafkaConsumer.consume();
    }
    

相关问题