首页 文章

RabbitMQ:多个消息和单个消费者

提问于
浏览
1

我有RabbitMQ消费者的问题 . 实际上我有一个消费者从三个队列中发送消息 . 问题是我需要从每个消息中获取多条消息,但我的消费者每个队列只获得一个消息并且结束了 . 如果有人能帮助我解决这个问题,我将不胜感激 .

消费者代码如下

for (int i = 0; i < queueNames.size(); i++) {

        Channel channel = connection.createChannel();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);

        flag = true;
        while (flag) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(routingKey);
            String message = new String(delivery.getBody(), "UTF-8");

                flag = false;
        }
    }

其中queueNames是一个包含我的队列名称的列表(数量为3) .

2 回答

  • 2

    您需要订阅队列,消费者将只按照您定义的方式使用1条消息

    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });
    

    更多信息:https://www.rabbitmq.com/api-guide.html

  • 1

    好的,我这样解决问题:

    boolean flag;
        System.out.println("Rozmiar queue " + queueNames.size());
        for (int i = 0; i < queueNames.size(); i++) {
    
            Channel channel = connection.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);
    
            flag = true;
            while (flag) {
    
                QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout);
                if (delivery == null) {
                    flag = false;
                } else {
    
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Message Received '" + message + "'");
                }
            }
        }
    

    我希望这个解决方案可以帮助将来的人:)

相关问题