首页 文章

RabbitMQ:快速的 生产环境 者和缓慢的消费者

提问于
浏览
14

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方 . 发件人以非常快的方式发送消息 . 接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入) . 由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列 . 所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

消费者收到的每条消息都包含一个caseID . 对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间 . 目前只为RabbitMQ设置了一个消费者,因为 生产环境 者/消费者使用相同的队列来发布/订阅caseID . 那么如何才能加快消费者吞吐量,以便消费者能够赶上 生产环境 者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入的消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议 .

5 回答

  • 1

    “这会导致消息队列溢出吗?”

    是 . 随着队列长度的增加,RabbitMQ将进入“流控制”状态,以防止过多的内存消耗 . 它还将开始将消息持久化到磁盘,而不是将它们保存在内存中 .

    “那么我怎样才能加快消费者的吞吐量,以便消费者能够赶上 生产环境 者并避免队列中的消息溢出”

    你有2个选择:

    • 添加更多消费者 . 请记住,如果选择此选项,您的数据库现在将被多个并发进程操纵 . 确保DB能承受额外的压力 .

    • 增加消费渠道的 QOS 值 . 这将从队列中提取更多消息并在消费者上缓冲它们 . 这将增加整体处理时间;如果缓冲了5条消息,则第5条消息将占用消息1 ... 5的处理时间 .

    “我应该在消费者部分使用多线程来加快消费率吗?”

    除非你有一个精心设计的解决方案 . 向应用程序添加并行性将在消费者方面增加大量开销 . 您最终可能会耗尽ThreadPool或限制内存使用量 .

    在处理AMQP时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案 . 您收到的消息对时间有多敏感?它们是否需要持久保存到DB ASAP,或者对您的用户是否重要,无论该数据是否立即可用?

    如果不需要立即持久存储数据,则可以修改应用程序,以便消费者只需从队列中删除消息并将其保存到Redis中的缓存集合中 . 引入第二个进程,然后按顺序读取和处理缓存的消息 . 这将确保您的队列长度不会充分增长以导致流量控制,同时防止您的数据库被写入请求轰炸,这通常比读取请求更昂贵 . 您的消费者现在只是从队列中删除消息,稍后由另一个进程处理 .

  • 1

    "So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue?"这是答案"use multiple consumers to consume the incoming message simultaneously",使用多线程并行运行这些消费者实现原则并没有共享,http://www.eaipatterns.com/CompetingConsumers.html

  • 0

    您有很多方法可以提高性能 .

    • 您可以使用更多 生产环境 者创建工作队列,这样您就可以创建一个简单的负载 balancer 系统 . 不要使用exchange ---> queue但只使用queue . 阅读这篇文章RabbitMQ Non-Round Robin Dispatching

    • 当您收到消息时,您可以创建一个用于在数据库中插入数据的池线程,但在这种情况下,您必须管理失败 .

    但我认为主要问题是数据库而不是RabbitMQ . 通过良好的调优,多线程和工作队列,您可以获得可扩展且快速的解决方案 .

    让我知道

  • 1

    虽然确实添加了更多的消费者可能会加快速度,但真正的问题将是保存到数据库中 .

    这里已经有很多答案谈论添加消费者(线程和/或机器)和改变QoS,所以我不打算重申这一点 . 相反,您应该认真考虑使用Aggregator模式将消息聚合到一组消息中,然后一次性将该组批量插入数据库 .

    每条消息的当前代码可能会打开一个连接,插入数据,然后关闭该连接(或返回池) . 更糟糕的是它甚至可能使用交易 .

    通过使用聚合器模式,您可以在刷新之前基本缓冲数据 .

    现在写一个好的聚合器很棘手 . 您需要决定如何缓冲(即每个工作人员都有自己的缓冲区或像Redis这样的中央缓冲区) . 我相信Spring集成有一个聚合器 .

  • 14

    作为答案我建议:两者 .

    您可以利用多个接收器,以及设置每个接收器以在单独的线程中执行任务,从而允许接收器接受队列中的下一个消息 .

    当然,这种方法假设每个操作的结果(如果我理解正确的话,在db上写入)不会以任何方式影响后续操作的结果以响应其他消息 .

相关问题