我们有一个项目正在使用Spring-AMQP来使用来自RabbitMQ代理的消息 . 我们希望增加消费方面的并发性,以便多个工作线程可以并行处理消息 . 我首先阅读了本机RabbitMQ客户端的文档,这引导我使用单个使用者的设计,并且预取计数> 1来控制并行性 . 直接使用RabbitMQ客户端,这似乎很自然 . DefaultConsumer
的 handleDelivery
方法可以生成一个新的 Runnable
来完成工作并在工作结束时确认消息 . 预取计数有效地控制了消费者产生的最大数量 Runnable
.
但是,这种设计似乎不适用于Spring-AMQP世界 . 在 SimpleMessageListenerContainer
中,对于每个AMQP使用者,所有消息都被传递到单个 BlockingQueueConsumer
,并且单个线程将消息从 BlockingQueueConsumer
的阻塞队列传递到 MessageListener
. 即使 SimpleMessageListenerContainer
支持 TaskExecutor
, TaskExecutor
仅用于为每个使用者运行一个任务 . 因此,要并行处理多个消息,必须使用多个消费者 .
这引出了一些关于Spring-AMQP并行性的问题 . 首先,我的初始设计是单个消费者和高预取计数是实现AMQP并行性的有效方法吗?如果是这样,为什么Spring-AMQP避开了这种设计,转而采用每个消费者的设计?是否可以自定义Spring-AMQP以实现单用户并行性?
1 回答
Spring AMQP是针对早期版本的Rabbit客户端库设计的,每个连接只有一个线程 .
这并不比简单地增加_134099更能给你带来更多的好处 - 唯一的区别是's a consumer for each thread, but there'并没有那么大的开销 .
你可以做你想要的,但是,使用
ChannelAwareMessageListener
并将acknowledgeMode设置为MANUAL
- 然后你的监听器负责处理消息 .在2.0(明年)中,我们将有一个替代的侦听器容器,它将直接在客户端库线程上调用侦听器 . 但是,这是相当多的工作 . This (closed) pull request has an initial PoC但它还不是一个功能齐全的容器 .