首页 文章

Python Kombu - 阻止

提问于
浏览
0

我正在使用kombu通过 生产环境 者/消费者模型管理RabbitMQ . 我启动了我的 生产环境 者,它在队列中放置了100个作业(我只有一个队列和一个交换) . 我想同时启动多个消费者,让每个消费者一次处理一个工作 . 不幸的是,消费者互相阻塞(即,当一个消费者从队列中抓取一份工作时,其他消费者只是闲置着) . 如果我杀死了工作消费者,那么其他一个消费者就会开始工作并开始工作 . 有没有办法让所有消费者同时运行,每个消费者从队列中处理不同的工作?我的消费者代码如下:

def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]

        callbacks.append(self._callback)
        queues.append(self.incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()

        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message

2 回答

  • -1

    您需要将消费者预取设置为1(https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos),这样每个消费者只会抓取1条消息,并将其余消息保留在状态就绪的队列中,因此如果您有2个消费者且QOS设置为1且您有100您将处理2个同步任务的消息 .

    我已将缺少的部分添加到您的代码中,以设置预取计数

    def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]
    
        callbacks.append(self._callback)
        queues.append(self.incoming_queue)
    
        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()
    
        channel = self.rabbitmq_connection.channel()
        channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
    
        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message
    
  • 5

    我认为你基本上是想自己重写Celery:

    http://www.celeryproject.org/

    除非你纯粹是为了学习目的而做,否则不要自己痛苦并使用芹菜 . 顺便说一句, kombuRabbitMQ 恰恰是Celery用作后端的东西(更不用说Redis后端可用了,这在一些应用程序中节省了我无数的努力) .

相关问题