我有一个独特的问题,每天发生50-100次,每天的消息量约为2百万 . 我正在使用Kafka 生产环境 者API 0.8.2.1,我有12个经纪人(v 0.8.2.2)在运行复制4的prod . 我有一个包含60个分区的主题,我正在为所有消息计算分区,并在ProducerRecord本身中提供值 . 现在,问题 -

应用程序创建'ProducerRecord'使用 -

new ProducerRecord<String, String>(topic, 30, null, message1);

提供主题,值message1和分区30.然后应用程序调用send方法并返回future -

// null is for callback 
Future<RecordMetadata> future = producer.send(producerRecord. null);

现在,app通过调用get on Future打印偏移量和分区值,然后从RecordMetadata获取值 - 这就是我得到的 -

Kafka响应:分区30,偏移3416092

现在,应用程序生成下一条消息 - message2到同一分区 -

new ProducerRecord<String, String>(topic, 30, null, message2);

和 Kafka 的回应 -

Kafka响应:分区30,偏移3416092

我再次收到相同的偏移量,如果我使用简单的消费者从分区30的偏移量中拉出消息,它最终会成为 message2 ,这实际上意味着 lost message1 .

基于KafkaProducer文档KafkaProducer,我在10个线程中使用单个生成器实例(静态实例共享) .

生产环境 者是线程安全的,通常应该在所有线程之间共享以获得最佳性能 .

我正在使用 生产环境 者的所有默认属性(max.request.size:10000000除外),消息(字符串有效负载)大小可以是几kbs到500 kbs . 我使用的ack值为1 .

我在这做错了什么?有什么我可以查看或任何 生产环境 者属性或服务器属性我可以调整,以确保我不会丢失任何消息 . 我很快就需要一些帮助,因为我在 生产环境 中丢失了一些关键信息,因为除了下游流程报告之外,它甚至很难找到丢失的信息 .

EDIT:

服务器和客户端现在更新为kafka版本0.8.2.2 . 此外,10个应用程序线程现在每个都使用自己的kafka生成器实例 . 我们看到了更好的性能,但仍有消息丢失 .

Producer Properties:

value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap.servers: {SERVER VIP ENDPOINT}
acks: 1
batch.size: 204800
linger.ms: 10
send.buffer.bytes: 1048576
max.request.size: 10000000