首页 文章

RabbitMQ - 消息传递顺序

提问于
浏览
41

我需要为我的新项目选择一个新的Queue代理 .

这次我需要一个支持pub / sub的可伸缩队列,并且必须保持消息排序 .

我读过亚历克西斯的评论:他写道:

“事实上,我们认为RabbitMQ比Kafka提供更强的订购”

我在rabbitmq docs中阅读了消息排序部分:

“消息可以使用具有重新排队参数(basic.recover,basic.reject和basic.nack)的AMQP方法返回到队列,或者由于在保留未确认的消息时关闭通道...使用版本2.7.0和如果队列有多个订阅者,个别消费者仍然可能无序地观察消息 . 这是由于其他订阅者可能会重新排队消息的行为 . 从队列的角度来看,消息总是保存在发布顺序中“ .

如果我需要按订单处理消息,我只能使用带有独占队列的rabbitMQ给每个消费者吗?

RabbitMQ仍然被认为是有序消息排队的一个很好的解决方案吗?

4 回答

  • 1

    好吧,让's take a closer look at the scenario you are describing above. I think it'重要的是在你问题的片段之前立即粘贴the documentation以提供上下文:

    AMQP 0-9-1核心规范的第4.7节解释了保证订购的条件:在一个信道中发布的消息,通过一个交换机,一个队列和一个输出信道将按照它们发送的相同顺序接收 . 自2.7.0发布以来,RabbitMQ提供更强大的保障 . 可以使用具有重新排队参数(basic.recover,basic.reject和basic.nack)的AMQP方法将消息返回到队列,或者由于在保留未确认消息时关闭通道 . 任何这些情况都会导致消息在早于2.7.0的RabbitMQ版本的队列后面重新排队 . 从RabbitMQ版本2.7.0开始,即使存在重新排队或通道关闭,消息也始终按发布顺序保存在队列中 . (重点补充)

    因此,很明显,从2.7.0开始,RabbitMQ在消息排序方面对原始AMQP规范进行了相当大的改进 .

    With multiple (parallel) consumers, order of processing cannot be guaranteed.
    第三段(粘贴在问题中)继续发表免责声明,我将解释:"if you have multiple processors in the queue, there is no longer a guarantee that messages will be processed in order."他们在这里说的是RabbitMQ不能违反数学定律 .

    考虑银行的一系列客户 . 这家银行以帮助客户进入银行的顺序而自豪 . 客户排队等候,由3个可用出纳员中的下一个提供服务 .

    今天早上,所有三名柜员同时出现,接下来的三位顾客接近了 . 突然之间,三名计票员中的第一名患者病情严重,无法完成服务第一位客户 . 到这时,出纳员2完成了客户2,出纳员3已经开始为客户3服务 .

    现在,有两件事情可能发生 . (1)第一个在线客户可以返回到 生产环境 线头部或(2)第一个客户可以抢先第三个客户,导致该柜员停止对第三个客户工作并开始第一个工作 . RabbitMQ不支持这种类型的抢占逻辑,也不支持我所知道的任何其他消息代理 . 在任何一种情况下,第一个客户实际上并没有最终获得帮助 - 第二个客户确实如此,幸运地获得了一个好的,快速的出纳员 . 保证客户的唯一方法是帮助订购一个柜员一次一个,这将导致银行的主要客户服务问题 .

    我希望这有助于说明您所询问的问题 . 鉴于您有多个消费者,不可能确保在每种可能的情况下按顺序处理消息 . 如果你有多个队列,多个独家消费者,不同的经纪人等,这没关系 - 没有办法保证先前消息是按照多个消费者的顺序回答的 . 但RabbitMQ将尽最大努力 .

  • 103

    消息排序在Kafka中保留,但仅在分区内而不是全局内 . 如果您的数据需要全局排序和分区,这确实会让事情变得困难 . 但是,如果您只是需要确保同一个用户的所有相同事件......最终都在同一个分区中,以便正确排序,您可以这样做 . 生产环境 者负责他们写入的分区,因此如果您能够对数据进行逻辑分区,则这可能更为可取 .

  • 0

    我认为这个问题有两个不相似的,消费订单和处理订单 .

    消息队列可以 - 在一定程度上 - 保证消息将按顺序消耗,但是,它们不能为您的处理顺序提供任何保证 .

    这里的主要区别在于消息处理的某些方面无法确定消费时间,例如:

    • 如上所述,消费者在处理时可能会失败,这里消息的消费顺序是正确的,但是,消费者未能正确处理它,这将使其返回队列,直到现在消费订单仍然完好但我们不知道我不知道现在处理顺序如何

    • 如果通过“处理”我们的意思是现在丢弃消息并完成处理,那么考虑处理时间不是线性的情况,换句话说处理一个消息比另一个消息花费更长时间,因此如果消息3在处理中花费更长时间与预期相比,消息4和5可能会被消耗并在消息3之前完成处理

    因此,即使您设法将消息返回到队列的前面(这违反了消耗顺序),您仍然无法保证在下一条消息之前的所有消息都已完成处理 .

    如果您想确保处理订单,那么:

    • 始终只有1个消费者实例

    • 或者不要使用消息传递队列并使用同步阻塞方法进行处理,这可能听起来很糟糕,但在许多情况下,业务要求完全有效,有时甚至是关键的

  • 6

    有适当的方法可以保证RabbitMQ订阅中的消息顺序 .

    如果您使用多个使用者,他们将使用共享 ExecutorService 处理该消息 . 另见 ConnectionFactory.setSharedExecutor(...) . 你可以设置一个 Executors.newSingleThreadExecutor() .

    如果将一个 Consumer 与一个队列一起使用,则可以使用多个bindingKeys绑定此队列(它们可能具有通配符) . 消息将按照消息代理接收的顺序放入队列 .

    例如,您有一个发布者,它发布订单很重要的消息:

    try (Connection connection2 = factory.newConnection();
            Channel channel2 = connection.createChannel()) {
        // publish messages alternating to two different topics
        for (int i = 0; i < messageCount; i++) {
            final String routingKey = i % 2 == 0 ? routingEven : routingOdd;
            channel2.basicPublish(exchange, routingKey, null, ("Hello" + i).getBytes(UTF_8));
        }
    }
    

    您现在可能希望以与发布它们相同的顺序从队列中的两个主题接收消息:

    // declare a queue for the consumer
    final String queueName = channel.queueDeclare().getQueue();
    
    // we bind to queue with the two different routingKeys
    final String routingEven = "even";
    final String routingOdd = "odd";
    channel.queueBind(queueName, exchange, routingEven);
    channel.queueBind(queueName, exchange, routingOdd);
    channel.basicConsume(queueName, true, new DefaultConsumer(channel) { ... });
    

    Consumer 现在将按照发布的顺序接收消息,无论您使用不同的主题 .

    RabbitMQ文档中有一些很好的5分钟教程可能会有所帮助:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

相关问题