我是kafka的新手,我正在尝试使用Apache kafka 0.9.0 Java客户端构建一个简单的消费者 - 生产环境 者消息队列(传统队列)模型 .
从 生产环境 者流程,我将100个随机消息推送到配置有3个分区的主题 . 看起来很好 .
我创建了3个具有相同组ID的消费者线程,订阅了相同的主题 . 自动提交已启用 . 由于所有3个消费者线程都订阅了相同的主题,我假设每个消费者都会获得一个消费分区,并且每个分区都会提交偏移量日志 .
但我在这里遇到了奇怪的问题 . 我的所有邮件都是重复的 . 我从每个帖子的消费者端得到x次更多记录 . 由于我的每个消费者线程都进行无限循环以从主题进行轮询,因此我必须终止该进程 .
我甚至尝试使用单线程,但仍然会重复记录x次并仍然继续 .
可以请任何帮助我确定我在这里做的错误 .
我发布了我的消费者代码供您参考 .
public class ConsumerDemo {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);
executor.submit(new ConsumerThread("topic1", "myThread-1"));
executor.submit(new ConsumerThread("topic1", "myThread-2"));
executor.submit(new ConsumerThread("topic1", "myThread-3"));
//executor shutdown logic is skipped
}
}
消费者线程:
public class ConsumerThread implements Runnable {
private static final String KAFKA_BROKER = "<<IP:port>>";
private final KafkaConsumer<String, String> consumer;
public ConsumerThread(String topic, String name) {
Properties props = new Properties();
props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
props.put("group.id", "DemoConsumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "6000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
this.consumer.subscribe(Collections.singletonList(topic));
}
public void run() {
try {
boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String,String> records= consumer.poll(10L);
System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
Iterator it = records.iterator();
while(it.hasNext()) {
ConsumerRecord record = (ConsumerRecord)it.next();
System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
}
}
consumer.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
同样非常重要的是,我的目标只是一次语义 . 我知道距离我1000英里 . 任何帮助都非常感谢 .
观察:调试sysout打印所有3个tpoics . 这是否意味着没有为每个消费者分配分区?
此消费者的分区分配:[topic1-1,topic1-0,topic1-2]
Kafka 专家,除了上述问题,我正在寻找其他2个输入 .
-
请帮我理解上面代码中的错误 .
-
一般来说,如何实现原理图 . 如果可能的话 .
-
消费者关闭等异常情况 . 如何处理而不丢失消息 .
提前致谢 .
3 回答
好吧,我发现我的代码/欠载有什么问题 .
在我开始进行原型设计之前,我应该完全阅读Kafka文档 .
这是我发现的 .
默认情况下,Kafka至少保证一次原理图 . 这意味着消费者至少获得一次消息(可能是多次 . 我假设如果我有3个分区并创建3个消费者,Kafka API将负责为一个消费者随机分配一个分区 . 这是错误的 .
所以我手动为每个消费者分配了一个分区,以确保我的消费者拥有分区并控制偏移,如下所示
恰好一次场景:为了确保我们消耗一次exaclty消息,我们需要控制偏移量 . 虽然到目前为止我还没有尝试,但基于我从谷歌搜索中学到的东西是它更好的方法来保存偏移量和数据 . 优选相同的交易 . 数据和偏移量都可以保存或回滚以进行重试 .
任何其他解决方案表示赞赏
您是否比会话超时更慢地使用消息?在这种情况下,您可能会发生重新 balancer 的风险,这可能会导致双重消费 .
排除因应用程序层中的 生产环境 者重试或重试而导致的重复 .
在 生产环境 者方面,如果没有为您的网络/集群正确配置“request.timeout.ms”,则可能由于启动缓慢(初始化 生产环境 者,连接 Build 等),初始请求将超时 生产环境 者,但实际上已由经纪人/服务器处理 . 这将导致重试重复 .