首页 文章

在没有从Kafka 10消费者提交的情况下消费消息

提问于
浏览
0

我需要从主题中读取消息,批量处理并将批处理推送到外部系统 . 如果批处理由于任何原因而失败,我需要再次使用相同的消息集并重复该过程 . 因此,对于每个批处理,每个分区的from和to offset都存储在数据库中 . 为了实现这一点,我通过为阅读器分配分区来为每个分区创建一个Kafka使用者,基于之前存储的偏移量,消费者寻找该位置并开始阅读 . 我已经关闭了自动提交,我没有从消费者那里提交抵消 . 对于每个批处理,我为每个分区创建一个新的使用者,从存储的最后一个偏移量中读取消息并发布到外部系统 . 您是否在消费消息时看到任何问题而没有提交偏移并在批次中使用相同的消费者组,但是在任何时候每个分区都不会有多个消费者?

2 回答

  • 0

    你的设计对我来说似乎很合理 .

    承诺抵消 Kafka 只是 Kafka 内部的一个方便的内置机制,以跟踪抵消 . 但是,没有任何要求使用它 - 您也可以使用任何其他机制来跟踪偏移(如在您的情况下使用数据库) .

    此外,如果手动分配分区,则无论如何都不会进行组管理 . 因此参数 group.id 无效 . 有关详细信息,请参阅http://docs.confluent.io/current/clients/consumer.html .

  • 1

    在kafka版本2中,我实现了这种行为,而不需要数据库来存储偏移量 . 以下是spring-boot-kafka的配置,但它也适用于任何kafka消费者api

    spring:
      kafka:
        bootstrap-servers: ...
        consumer:
          value-deserializer: ...
          max-poll-records: 1000
          enable-auto-commit: false
          fetch-min-size: 262144 # 1/4 mb..
          group-id: ...
          fetch-max-wait: 10000 # we will consume every 10s or when 1/4 mb or 1000 records are accumulated.
          auto-offset-reset: earliest
        listener:
          type: batch
          concurrency: 7
          ack-mode: manual
    

    这给了我最多批量的消息 . 1000条记录(取决于负载) . 然后我将这些记录异步写入数据库并计算我得到多少次成功回调 . 如果成功写入等于收到的批量大小,则确认批次,例如我承诺抵消 . 即使在高负荷的 生产环境 环境中,这种设计也非常可靠 .

相关问题