首页 文章

Spring amqp批处理接收消息

提问于
浏览
3

我们使用Spring AMQP从RabbitMQ读取消息,现在我们一次只读取一条消息离开队列,无论如何我可以从队列中读取多条消息然后处理批处理?

我看到Spring中有一个BatchingStrategy,我怎样才能将那个插入connectionFactory?

这是我的代码:

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        JsonMessageConverter converter = new JsonMessageConverter();
        DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
        defaultClassMapper.setDefaultType(Message.class);
        converter.setClassMapper(defaultClassMapper);
        factory.setMessageConverter(converter);
        factory.setConcurrentConsumers(3);

...

public class Processor implements IChannelProcessor {
@Override
    public void process(Message message) {
        validateMessageEvent(message);
        // process the message

1 回答

  • 3

    BatchingStrategy 用于将多个消息段放入单个amqp消息中;容器会自动声明此类消息 . 它对你的目的无济于事 .

    做你想做的事;而不是使用POJO消息传递( @RabbitListener ),您将不得不使用 acknowledgeMode 设置为 MANUALChannelAwareMessageListener 的消息侦听器容器 .

    处理完批量消息后,请在通道上调用 basicAck 以确认批次中的所有消息 .

相关问题