首页 文章

可以 生产环境 到 Kafka 但不能消费

提问于
浏览
0

我正在使用Kafka JDK客户端ver 0.10.2.1 . 我能够为Kafka生成一个简单的消息进行"heartbeat"测试,但是我无法使用sdk消费来自同一主题的消息 . 当我进入Kafka CLI时,我能够使用该消息,所以我已经确认消息在那里 . 这里's the function I' m用于从我的Kafka服务器消费,使用道具 - 只有在我确实确认 produce() 成功之后才将我生成的消息传递给主题,如果请求我可以稍后发布该功能:

private def consumeFromKafka(topic: String, expectedMessage: String): Boolean = {
    val props: Properties = initProps("consumer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(List(topic).asJava)
    var readExpectedRecord = false
    try {
      val records = {
        val firstPollRecs = consumer.poll(MAX_POLLTIME_MS)
        // increase timeout and try again if nothing comes back the first time in case system is busy
        if (firstPollRecs.count() == 0) firstPollRecs else {
          logger.info("KafkaHeartBeat: First poll had 0 records- trying again - doubling timeout to "
            + (MAX_POLLTIME_MS * 2)/1000 + " sec.")
          consumer.poll(MAX_POLLTIME_MS * 2)
        }
      }
      records.forEach(rec => {
        if (rec.value() == expectedMessage) readExpectedRecord = true
      })
    } catch {
      case e: Throwable => //log error
    } finally {
      consumer.close()
    }
    readExpectedRecord
  }

private def initProps(propsType: String): Properties = {
    val prop = new Properties()
    prop.put("bootstrap.servers", kafkaServer + ":" + kafkaPort)

    propsType match {
      case "producer" => {
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        prop.put("acks", "1")
        prop.put("producer.type", "sync")
        prop.put("retries", "3")
        prop.put("linger.ms", "5")
      }
      case "consumer" => {
        prop.put("group.id", groupId)
        prop.put("enable.auto.commit", "false")
        prop.put("auto.commit.interval.ms", "1000")
        prop.put("session.timeout.ms", "30000")
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        // poll just once, should only be one record for the heartbeat
        prop.put("max.poll.records", "1")
      }
    }
    prop
  }

现在当我运行代码时,这是它在控制台中输出的内容:

13:04:21 - 为组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e发现了协调器serverName:9092(id:2147483647 rack:null) . 13:04:23 INFO oakcciConsumerCoordinator - 为组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:24撤销之前指定的分区[] INFO oakcciAbstractCoordinator - (重新)加入组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:25 INFO oakcciAbstractCoordinator - 已成功加入组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e与第1代13:04:26 INFO oakcciConsumerCoordinator - 设置新分配的分区[HeartBeat_Topic.Service_5.2018-08-03.13_04_10.377 -0]对于组0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:27 INFO cpplutil.KafkaHeartBeatUtil - KafkaHeartBeat:第一次民意调查有0条记录 - 再次尝试 - 将超时加倍到60秒 .

然后没有别的,没有抛出错误 - 所以没有记录任何记录 . 有谁知道是什么阻止了'消费'的发生?订阅者似乎是成功的,因为我能够成功调用listTopics并列出partions没问题 .

1 回答

  • 1

    您的代码有错误 . 看来你的路线:

    if (firstPollRecs.count() == 0)
    

    应该这样说

    if (firstPollRecs.count() > 0)
    

    否则,你传入一个空的 firstPollRecs ,然后迭代它,显然什么都不返回 .

相关问题