首页 文章

AMQP basic.get从队列中提取并发消费者

提问于
浏览
6

当使用RabbitMQ作为Message Broker时,我有一个场景,其中多个并发使用者使用basic.get AMQP方法从队列中提取消息,并使用显式确认来从队列中删除消息 . 假设以下设置

Q具有消息M1,M2,M3并且消费者C1,C2和C3(每个都具有其自己的连接和信道)连接到它 .

  • 如何在basic.get方法中处理并发?对basic.get方法的调用是否同步处理每个使用自己的连接和通道的并发使用者? C1,C2和C3发出basic.get调用以同时接收消息(假设服务器同时接收所有3个请求) .

  • C1使用basic.get请求消息并获取M1 . 当C2请求消息时,由于它使用不同的连接,它是否再次获得M1?

  • 消费者如何以预定义的大小批量提取消息?

3 回答

  • -1

    该方案不需要特殊配置 . 每个客户端都会以原子方式从队列中获取和接收一条消息,就像您希望的那样 .

  • 7

    你可能想读RabbitMQ Api guideintroduction to Amqp .

    首先,避免在消费者中使用 basicGet 消费消息 . 而是使用Consumer接口 basicConsume . 这允许RabbitMq在消息到达队列时向您推送消息 . 其他一切都是资源的一部分,因为它归结为繁忙的民意调查 .

    当使用 basicConsume 时,RabbitMq甚至会在后台向您推送更多消息,直至某个 prefetch 计数 . 这允许您同时处理多个消息,并最大限度地缩短等待下一条消息处理所需的时间(如果有消息可用) .

    并发不是一个问题,那就是你正在使用队列!当在一个队列上具有多个消费者时,消息将始终仅被传递给一个消费者(只要该消息被确认) . 否则,您需要为每个消费者提供专用队列,并相应地路由您的消息 .

    顺便说一句,如果你能够在消费者之间分享联系,你应该这样做 . 只需确保每个线程使用一个通道 .

  • 1

    您的问题确实是排队和流程理论的核心,所以我将从这个角度回答(就我的答案而言,RabbitMQ实际上是一个通用的消息代理,因为这适用于任何消息代理) .

    如何在basic.get方法中处理并发?对basic.get方法的调用是否同步处理每个使用自己的连接和通道的并发使用者? C1,C2和C3发出basic.get调用以同时接收消息(假设服务器同时接收所有3个请求) .

    Answer 1 :RabbitMQ旨在成为可靠的消息代理 . 它包含内部流程和控件,以确保相同的消息不会多次传递给不同的使用者 . 现在,由于测试您描述的场景的不切实际,它是否完美运行?谁知道 . 这就是为什么使用基于消息的体系结构的正确设计的应用程序将使用幂等事务,这样如果多次处理相同的事务,结果将与处理一次事务的结果相同 . Takeaway :设计您的应用程序,以便这个问题的答案不重要 .

    C1使用basic.get请求消息并获取M1 . 当C2请求消息时,由于它使用不同的连接,它是否再次获得M1?

    Answer 2 :否 . 根据我之前回答的假设,RabbitMQ经纪人一旦交付,将不再提供相同的消息 . 根据通道和队列的设置,消息可能会在交付时自动确认,并且永远不会被重新传递 . 其他设置将使消息在处理线程/通道的"death"处自动重新排队,或者从处理线程返回否定确认 . 这是一项重要的功能,因为如果可以为多个消费者提供服务,那么"poison"消息可能会在您的应用程序中反复肆虐 . Takeaway :您可以在设计应用程序时安全地依赖此假设 .

    消费者如何以预定义的大小批量提取消息?

    Answer :他们不能,也不会对他们有意义 . 在任何排队系统中,基本假设是在单个文件中从队列中删除项目 . 试图违反这一假设会导致不可预测的行为;此外,单件流通常是最有效的处理方法 . 但是,在现实世界中,有时需要批量> 1 . 在这种情况下,将批处理加载到其自己的单个消息中是有意义的,因此这可能需要一个单独的处理线程,该消息从队列中提取消息并将它们一起批处理,或者最初将它们批量放入 . 请记住,一旦有多个消费者,就没有办法保证按顺序处理单个消息 . Takeaway :应尽可能避免批处理,但在不切实际的地方应避免,您可能不会认为批次将包含任何特定顺序的单个邮件 .

相关问题