首页 文章

RabbitMQ / AMQP:单个队列,同一个消息的多个消费者?

提问于
浏览
97

我一般只是开始使用RabbitMQ和AMQP .

  • 我有一个消息队列

  • 我有多个消费者,我想用 same message 做不同的事情 .

大多数RabbitMQ文档似乎都专注于循环,即单个消费者使用单个消息,负载在每个消费者之间传播 . 这确实是我见证的行为 .

例如: 生产环境 者有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我两次启动消费者, I can see that each consumer is consuming alternate messages in round-robin behavior. Eg, I'll see messages 1, 3, 5 in one terminal, 2, 4, 6 in the other .

我的问题是:

  • 我可以让每个消费者收到相同的消息吗?即,两个消费者都得到消息1,2,3,4,5,6?在AMQP / RabbitMQ中这叫什么?它是如何正常配置的?

  • 这常见吗?我是否应该将消息交换路由到两个单独的队列中,而不是单个消费者?

11 回答

  • 0

    Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

    不,如果消费者在同一个队列中,则不会 . 来自RabbitMQ的AMQP Concepts指南:

    重要的是要理解,在AMQP 0-9-1中,消息在消费者之间进行负载 balancer .

    这似乎意味着 round-robin behavior within a queue is a given ,而且不可配置 . 即,需要单独的队列以便由多个消费者处理相同的消息ID .

    Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?

    不,不是,每个消费者处理相同消息ID的单个队列/多个消费者是不可能的 . 将消息转换为两个单独的队列的交换路由确实更好 .

    由于我不需要太复杂的路由, fanout exchange 会很好地处理这个问题 . 我没有't focus too much on Exchanges earlier as node-amqp has the concept of a '默认交换'允许您直接向连接发布消息,但大多数AMQP消息都发布到特定交换 .

    这是我的粉丝交换,包括发送和接收:

    var amqp = require('amqp');
    var connection = amqp.createConnection({ host: "localhost", port: 5672 });
    var count = 1;
    
    connection.on('ready', function () {
      connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
    
        var sendMessage = function(exchange, payload) {
          console.log('about to publish')
          var encoded_payload = JSON.stringify(payload);
          exchange.publish('', encoded_payload, {})
        }
    
        // Recieve messages
        connection.queue("my_queue_name", function(queue){
          console.log('Created queue')
          queue.bind(exchange, ''); 
          queue.subscribe(function (message) {
            console.log('subscribed to queue')
            var encoded_payload = unescape(message.data)
            var payload = JSON.parse(encoded_payload)
            console.log('Recieved a message:')
            console.log(payload)
          })
        })
    
        setInterval( function() {    
          var test_message = 'TEST '+count
          sendMessage(exchange, test_message)  
          count += 1;
        }, 2000) 
     })
    })
    
  • 1

    刚读完rabbitmq tutorial . 您发布消息以进行交换,而不是排队;然后将其路由到适当的队列 . 在您的情况下,您应该为每个使用者绑定单独的队列 . 这样,他们就可以完全独立地使用消息 .

  • 12

    最后几个答案几乎是正确的 - 我有大量的应用程序生成需要最终与不同消费者的消息,因此过程非常简单 .

    如果您希望多个使用者收到同一条消息,请执行以下步骤 .

    为每个要接收消息的应用创建多个队列,在每个队列属性中,使用amq.direct交换“绑定”路由标记 . 更改发布应用程序以发送到amq.direct并使用路由标记(不是队列) . 然后,AMQP将使用相同的绑定将消息复制到每个队列中 . 作品谎言魅力:)

    示例:假设我有一个我生成的JSON字符串,我使用路由标记“new-sales-order”将其发布到“amq.direct”交换,我有一个用于打印订单的order_printer应用程序的队列,我有一个我的计费系统的队列,它将发送订单的副本并为客户开具发票,我有一个网络存档系统,我按照历史/合规性原因存档订单,我有一个客户端网络界面,其中订单被跟踪,其他信息来自于订单 .

    所以我的队列是:order_printer,order_billing,order_archive和order_tracking都绑定了绑定标签“new-sales-order”,所有4个都将获得JSON数据 .

    这是在没有发布应用程序知道或关心接收应用程序的情况下发送数据的理想方式 .

  • 12

    发送模式是一对一的关系 . 如果你想"send"到多个接收器,你应该使用pub / sub模式 . 有关详细信息,请参阅http://www.rabbitmq.com/tutorials/tutorial-three-python.html .

  • 1

    是的,每个消费者都可以收到相同的消息看看http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

    用于路由消息的不同方式 . 我知道它们适用于python和java,但它很好理解原理,决定你在做什么,然后找到如何在JS中做到这一点 . 听起来你想做一个简单的扇出(tutorial 3),它将消息发送到连接到交换机的所有队列 .

    与您正在做的事情以及您想要做的事情的不同之处主要在于您要设置和交换或输入扇出 . Fanout excahnges将所有消息发送到所有连接的队列 . 每个队列都有一个消费者,可以分别访问所有消息 .

    是的,这是常见的,它是AMPQ的功能之一 .

  • 3

    RabbitMQ / AMQP:单个队列,多个消费者,用于相同的消息和页面刷新 .

    rabbit.on('ready', function () {    });
        sockjs_chat.on('connection', function (conn) {
    
            conn.on('data', function (message) {
                try {
                    var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));
    
                    if (obj.header == "register") {
    
                        // Connect to RabbitMQ
                        try {
                            conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                                autoDelete: false,
                                durable: false,
                                exclusive: false,
                                confirm: true
                            });
    
                            conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                                durable: false,
                                autoDelete: false,
                                exclusive: false
                            }, function () {
                                conn.channel = 'my-queue-'+obj.agentID;
                                conn.q.bind(conn.exchange, conn.channel);
    
                                conn.q.subscribe(function (message) {
                                    console.log("[MSG] ---> " + JSON.stringify(message));
                                    conn.write(JSON.stringify(message) + "\n");
                                }).addCallback(function(ok) {
                                    ctag[conn.channel] = ok.consumerTag; });
                            });
                        } catch (err) {
                            console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                        }
    
                    } else if (obj.header == "typing") {
    
                        var reply = {
                            type: 'chatMsg',
                            msg: utils.escp(obj.msga),
                            visitorNick: obj.channel,
                            customField1: '',
                            time: utils.getDateTime(),
                            channel: obj.channel
                        };
    
                        conn.exchange.publish('my-queue-'+obj.agentID, reply);
                    }
    
                } catch (err) {
                    console.log("ERROR ----> " + err.stack);
                }
            });
    
            // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
            conn.on('close', function () {
                try {
    
                    // Close the socket
                    conn.close();
    
                    // Close RabbitMQ           
                   conn.q.unsubscribe(ctag[conn.channel]);
    
                } catch (er) {
                    console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
                }
            });
        });
    
  • 0

    要获得所需的行为,只需让每个使用者从其自己的队列中消耗 . 您必须使用非直接交换类型(主题, Headers ,扇出)才能立即将消息发送到所有队列 .

  • 0

    我评估你的情况是:

    • 我有一个消息队列(您接收消息的来源,我们将其命名为q111)

    • 我有多个消费者,我想做的不同有相同信息的事情 .

    这里你的问题是这个队列收到3条消息,消费者A消费消息1,其他消费者B和C消耗消息2和3.你需要一个设置,其中rabbitmq传递相同的副本所有这三个消息(1,2,3)同时发送给所有三个连接的消费者(A,B,C) .

    虽然可以通过许多配置来实现此目的,但一种简单的方法是使用以下两步概念:

    • 使用动态rabbitmq-shovel从所需队列中拾取消息(q111)并发布到扇出交换(专门为此目的创建并专用的交换) .

    • 现在重新配置您的消费者A,B和C(正在侦听队列(q111))直接使用每个消费者的独占和匿名队列从此Fanout交换机进行监听 .

    Note: While using this concept don't consume directly from the source queue(q111), as messages already consumed wont be shovelled to your Fanout exchange.

    如果您认为这不符合您的确切要求...随时发布您的建议:-)

  • 4

    如果您正好像我一样使用amqplib库,那么它们有handy examplePublish/Subscribe RabbitMQ tutorial实现,您可能会觉得它很方便 .

  • 79

    我认为你应该检查使用 fan-out 交换机发送你的消息 . 这样,您将收到针对不同消费者的相同消息,RabbitMQ正在为这些新消费者/订阅者中的每一个创建不同的队列 .

    这是在javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html中查看教程示例的链接

  • 5

    在这种情况下有一个有趣的选择我在答案中找不到 .

    您可以在一个消费者中使用“重新排队”功能来消息,以便在另一个消费者中处理这些消息 . 一般来说,这不是一种正确的方式,但也许它对某人来说足够好 .

    https://www.rabbitmq.com/nack.html

    并注意循环(当所有concumers nack重新排队消息时)!

相关问题