首页 文章

Spring AMQP具有预取的单一消费者并行性

提问于
浏览
1

我们有一个项目正在使用Spring-AMQP来使用来自RabbitMQ代理的消息 . 我们希望增加消费方面的并发性,以便多个工作线程可以并行处理消息 . 我首先阅读了本机RabbitMQ客户端的文档,这引导我使用单个使用者的设计,并且预取计数> 1来控制并行性 . 直接使用RabbitMQ客户端,这似乎很自然 . DefaultConsumerhandleDelivery 方法可以生成一个新的 Runnable 来完成工作并在工作结束时确认消息 . 预取计数有效地控制了消费者产生的最大数量 Runnable .

但是,这种设计似乎不适用于Spring-AMQP世界 . 在 SimpleMessageListenerContainer 中,对于每个AMQP使用者,所有消息都被传递到单个 BlockingQueueConsumer ,并且单个线程将消息从 BlockingQueueConsumer 的阻塞队列传递到 MessageListener . 即使 SimpleMessageListenerContainer 支持 TaskExecutorTaskExecutor 仅用于为每个使用者运行一个任务 . 因此,要并行处理多个消息,必须使用多个消费者 .

这引出了一些关于Spring-AMQP并行性的问题 . 首先,我的初始设计是单个消费者和高预取计数是实现AMQP并行性的有效方法吗?如果是这样,为什么Spring-AMQP避开了这种设计,转而采用每个消费者的设计?是否可以自定义Spring-AMQP以实现单用户并行性?

1 回答

  • 1

    Spring AMQP是针对早期版本的Rabbit客户端库设计的,每个连接只有一个线程 .

    DefaultConsumer的handleDelivery方法可以生成一个新的Runnable,它可以完成工作并在工作结束时确认消息 .

    这并不比简单地增加_134099更能给你带来更多的好处 - 唯一的区别是's a consumer for each thread, but there'并没有那么大的开销 .

    你可以做你想要的,但是,使用 ChannelAwareMessageListener 并将acknowledgeMode设置为 MANUAL - 然后你的监听器负责处理消息 .

    在2.0(明年)中,我们将有一个替代的侦听器容器,它将直接在客户端库线程上调用侦听器 . 但是,这是相当多的工作 . This (closed) pull request has an initial PoC但它还不是一个功能齐全的容器 .

相关问题