我是Kafka的新手,我对消费者的理解是基本上有两种类型的实现 .
1)The High level consumer/consumer group
2)Simple Consumer
关于高级抽象的最重要部分是当Kafka不关心处理偏移时使用,而Simple消费者提供了对偏移管理更好的控制 . 令我困惑的是,如果我想在多线程环境中运行使用者并且还希望控制偏移量 . 如果我使用消费者组,这意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我唯一的选择 .
我是Kafka的新手,我对消费者的理解是基本上有两种类型的实现 .
1)The High level consumer/consumer group
2)Simple Consumer
关于高级抽象的最重要部分是当Kafka不关心处理偏移时使用,而Simple消费者提供了对偏移管理更好的控制 . 令我困惑的是,如果我想在多线程环境中运行使用者并且还希望控制偏移量 . 如果我使用消费者组,这意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我唯一的选择 .
2 回答
在大多数情况下,高级消费者API不允许您直接控制偏移量 .
首次创建使用者组时,您可以告诉它是否以kafka使用
auto.offset.reset
属性存储的最旧或最新消息开头 .您还可以通过将
auto.commit.enable
设置为false来控制高级别使用者何时向zookeeper提交新的偏移量 .由于高级消费者将偏移量存储在zookeeper中,您的应用程序可以直接访问zookeeper并操纵偏移量 - 但它将超出高级消费者API .
您的问题有点令人困惑,但您可以在多线程环境中使用简单的使用者 . 这就是高级消费者的作用 .
在Apache Kafka 0.9和0.10中,消费者组管理完全在Kafka应用程序中由Broker(用于协调)和主题(用于状态存储)处理 .
当消费者组首次订阅某个主题时,
auto.offset.reset
的设置确定消费者开始使用消息的位置(http://kafka.apache.org/documentation.html#newconsumerconfigs)您可以注册
ConsumerRebalanceListener
以在为特定客户分配主题/分区时接收通知 .消费者运行后,您可以使用
seek
,seekToBeginning
和seekToEnd
来获取特定偏移量的消息 .seek
影响该消费者的下一个poll
,并存储在下一次提交中(例如commitSync
,commitAsync
或auto.commit.interval过去时,如果启用) .消费者javadocs提到了更具体的情况:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
一旦分配了分区,您就可以将Kafka提供的组管理与通过搜索(..)手动管理偏移相结合 .