首页 文章

Kafka消费者 - 客户端没有在zookeeper上注册消费者群体的抵消

提问于
浏览
0

我正在尝试使用 kafka-clients v.0.10.2.1 创建具有不同消费者群组的多个消费者到kafka主题 . 虽然我无法检索消费者群体提交的最后一个偏移量 .

目前我的消费者属性看起来像这样

Properties cproperties = new Properties();
    cproperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
    cproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, my-broker));
    cproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    cproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    cproperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, taskDecoder.getClass());
    cproperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");

并且没有属性自动重置偏移我不能从主题消耗,但我不能使用此配置,我需要在zookeeper上注册的消费者组 . 所以,我还需要在zookeeper / consumer上创建一个消费者群体 .

1 回答

  • 0

    您需要将属性 auto.offset.reset 包含到 earliest (或 latest ,具体取决于您要实现的目标),以避免在未找到偏移时抛出异常(可能是因为数据已删除) .

    您还需要确保手动提交偏移量,因为您已禁用自动提交 .

    cproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    

    为此,您可以使用commitSync()

    这会向Kafka提供补偿 . 使用此API提交的偏移量将在每次重新 balancer 后以及启动时的第一次提取时使用 . 因此,如果您需要在Kafka以外的任何其他位置存储偏移量,则不应使用此API . 提交的偏移量应该是应用程序将使用的下一条消息,即lastProcessedMessageOffset 1 .

    commitAsync()

    这只向Kafka提交抵消 . 使用此API提交的偏移量将在每次重新 balancer 后以及启动时的第一次提取时使用 . 因此,如果您需要在Kafka以外的任何其他位置存储偏移量,则不应使用此API . 这是一个异步调用,不会阻塞 . 遇到的任何错误都会传递给回调(如果提供)或被丢弃 .

    请注意,如果您不提交偏移量,则在 auto.offset.reset 设置为 none 时将引发异常 .

    当Kafka中没有初始偏移或服务器上不再存在当前偏移时(例如因为该数据已被删除)该怎么办:最早:自动将偏移重置为最早的偏移最新:自动重置偏移到最新的偏移量:如果没有为消费者的组找到任何其他偏移量,则向消费者抛出异常:向消费者抛出异常 .

相关问题