我正在构建一个Kafka Consumer应用程序,它使用来自Kafka主题的消息并执行数据库更新任务 . 消息每天以大批量生成一次 - 因此主题在10分钟内加载了大约100万条消息 . 主题有8个分区 .
Spring Kafka Consumer(使用@KafkaListener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批次中触发 .
批量大小有时只有1或2条消息 . 如果它可以一次消耗大约1000条消息并一起处理它(例如,我可以在单个更新SQL中更新数据库),而不是为每条消息连接到数据库,它将有助于性能 .
我已经尝试减少工厂中的并发性,以避免多个线程消耗较少数量的消息 .
我还将Kafka的server.properties中的 socket.send.buffer.bytes 属性从102400增加到1024000 .
这些步骤没有增加批量大小 .
我可以使用任何其他配置来增加消费者的浴缸尺寸吗?
1 回答
请参阅kafka使用者属性
max.poll.records
,_ 30283,fetch.max.wait.ms
,fetch.max.bytes
,max.partition.fetch.bytes
.很可能
fetch.min.bytes
和fetch.max.wait.ms
就是你所需要的 .