首页 文章

KAFKA REMOTE AWS consumer.poll

提问于
浏览
0

嗨,我一直在尝试学习KAFKA并与我的远程轮询器/消费者有问题 .

我在AWS EC2实例中使用私有和公共ip设置了KAFKA . 我的server.properties看起来像这样 .

listeners = PLAINTEXT://172.31.31.58:9092 #AWS Private IP

advertised.listeners = PLAINTEXT:// 35 . ?? . ?? . ??:9092 #AWS Public IP Masked

我的AWS EC2安全组配置为允许任何端口上的任何IP上的流量用于测试目的 .

当我使用以下脚本在我的EC2实例中本地生成/使用消息时,它可以完美地工作

bin / kafka-console-producer.sh --broker-list localhost:9092 - topic test

bin / kafka-console-consumer.sh --bootstrap-server localhost:9092 - topic test --from-beginning

但是当我尝试从运行我的Java API的远程笔记本电脑Eclipse代码连接到同一个kafka实例时,我的代码在consumer.poll(100)中永远挂起 . 我在这里做错了吗?

Properties props = new Properties();
     props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners
     props.put("group.id", "test123");
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("test"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

     } }

1 回答

  • 1

    你确定它挂在了 poll() 吗?或者是 poll() 刚刚返回一个空的 ConsumerRecords 并且它在 while(true) 中循环?

    默认情况下,如果您没有为组提交任何偏移量,则使用者将在主题末尾开始,因此它只会接收新消息 . 在这种情况下,如果要使用主题中已有的消息,则需要将 auto.offset.reset 设置为 earliest (就像在控制台消费者中使用 --from-beginning 一样)

    编辑:

    如果它实际上卡在 poll() 中,则可能是连接问题 . 要找到答案,最好的方法是在启用日志记录的情况下运行客户端 . 创建一个包含以下内容

    log4j.rootLogger=DEBUG, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
    

    -Dlog4j.configuration=file:PATH_TO_FILE 启动你的客户端

相关问题