首页 文章

如何使用Kafka 0.8.2的Consumer API?

提问于
浏览
17

我开始使用最新的Kafka文档http://kafka.apache.org/documentation.html . 但是当我尝试使用新的Consumer API时遇到了一些问题 . 我已完成以下步骤:

1. Add a new dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2. Add configurations

Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3. Use KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

但是,当我尝试从代理轮询消息时,我只得到null:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

然后,在检查源代码后,我知道消费者有什么问题:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

更糟糕的是,我找不到关于0.8.2 API的任何其他有用信息,因为关于Kafka的所有用法都与最新版本不兼容 . 有人能帮助我吗?非常感谢 .

2 回答

  • 1

    我也试图在Kafka 0.8.2.1之上编写一个Consumer来读取新Producer生成的消息 .

    到目前为止,我得到的是Producer API已经准备好并且可用,而在消费者方面,我们必须等待0.8.3,正如@habsq所说,你已经发现有一些代码包含在Consumer中,但它仍然不起作用 .

    因此,使用的客户端(当前的客户端API)是您当前Kafka版本的“核心”项目中的一个,即0.8.2.1(最好不要将客户端降级到任何其他版本) .

    所以现在我们需要导入两个jar:一个用于“新”java客户端,一个用于核心项目,还取决于你使用的scala版本(我使用2.11) .

    在我的情况下,我使用graddle来管理依赖项,所以我只需要导入

    dependencies {
      compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
      compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
    }
    

    更新依赖项时,它将获得所有需要的库 .

    如果您使用的是其他Scala版本,请更改版本;无论如何,你可以在maven中心找到所有不同的版本或完整的pom:http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

    如果您使用这些Consumer实现,则所有当前示例都应该照常工作 .

    PS ref:Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

  • 0

    是的我可以确认0.8.2.1版本在消费消息方面存在问题 . 现在使用Java / Groovy创建一个简单的使用者并发布0.10.1.0,一切都运行良好 .

    不再需要设置 PARTITION_ASSIGNMENT_STRATEGY .

相关问题