我使用的是kafka-0.10.2.0-rc2版本 . 我想知道无论如何要检查消费者群体的当前偏移量,其中消费者群体中没有消费者活跃 .

我尝试使用 kafka-consumer-groups.sh 进行描述,但它始终显示以下消息

Consumer group `<group_name>` is rebalancing.

我甚至检查了kafka源代码,这个消息是预期组中没有消费者活跃的消息 .

snipcode: ConsumerGroupCommand.scala

protected def describeGroup(group: String) {
      adminClient.describeConsumerGroup(group) match {
        case None => println(s"Consumer group `${group}` does not exist.")
        case Some(consumerSummaries) =>
          if (consumerSummaries.isEmpty)
            println(s"Consumer group `${group}` is rebalancing.")
          else {
            val consumer = getConsumer()
            printDescribeHeader()
            consumerSummaries.foreach { consumerSummary =>
              val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
              val partitionOffsets = topicPartitions.flatMap { topicPartition =>
                Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
                  topicPartition -> offsetAndMetadata.offset
                }
              }.toMap
              describeTopicPartition(group, topicPartitions, partitionOffsets.get,
                _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
            }
          }
      }
    }