首页 文章

Python Pika - 消费者线程

提问于
浏览
3

我正在开发一个带有后台线程的Python应用程序,用于消费来自RabbitMQ队列的消息(主题场景) .

我在Button的on_click事件上启动线程 . 这是我的代码,请注意“#self.receive_command()” .

def on_click_start_call(self,widget):


    t_msg = threading.Thread(target=self.receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()


def receive_command(self):

    syslog.syslog("ENTERED")

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    syslog.syslog("1")

    channel = connection.channel()
    syslog.syslog("2")

    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    syslog.syslog("3")

    result = channel.queue_declare(exclusive=True)
    syslog.syslog("4")

    queue_name = result.method.queue
    syslog.syslog("5")

    def callback_rabbit(ch,method,properties,body):
        syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")

    syslog.syslog("6")

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    syslog.syslog("7")

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    syslog.syslog("8")

    channel.start_consuming()

如果我运行此代码,我在syslog上看不到消息1,2,3,5,6,7,8但我只能看到“已输入” . 所以,代码被锁定在pika.BlokingConnection上 .

如果我运行相同的代码(注释线程指令并取消对函数的直接调用),则所有代码都按预期工作并正确接收消息 .

有任何解决方案可以将消费者运行到线程中吗?

提前致谢

达维德

2 回答

  • 7

    我已经在我的机器上测试了代码,并使用了最新版本的Pika . 它工作正常 . Pika存在线程问题,但只要您为每个线程创建一个连接,它就不应该成为问题 .

    如果您遇到问题,很可能是因为旧版Pika中的错误,或导致问题的线程无关的问题 .

    我建议您避免使用0.9.13,因为存在多个错误,但很快就会发布0.9.14 0.10.0 .

    [Edit] Pika 0.9.14已经发布 .

    这是我使用的代码 .

    def receive_command():
        print("ENTERED")
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        print("1")
        channel = connection.channel()
        print("2")
        channel.exchange_declare(exchange='STORE_CMD', type='topic')
        print("3")
        result = channel.queue_declare(exclusive=True)
        print("4")
        queue_name = result.method.queue
        print("5")
        def callback_rabbit(ch,method,properties,body):
            print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")
        print("6")
        channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
        print("7")
        channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
        print("8")
        channel.start_consuming()
    
    def start():
        t_msg = threading.Thread(target=receive_command)
        t_msg.start()
        t_msg.join(0)
        #self.receive_command()
    start()
    
  • 1

    另一种方法是将线程方法 channel.start_consuming 传递给目标,然后将回调传递给 consume 方法 . 用法: consume(callback=your_method, queue=your_queue)

    import threading
    
    def consume(self, *args, **kwargs):
        if "channel" not in kwargs \
                or "callback" not in kwargs \
                or "queue" not in kwargs \
                or not callable(kwargs["callback"]):
            return None
    
        channel = kwargs["channel"]
        callback = kwargs["callback"]
        queue = kwargs["queue"]
        channel.basic_consume(callback, queue=queue, no_ack=True)
    
        t1 = threading.Thread(target=channel.start_consuming)
        t1.start()
        t1.join(0)
    

相关问题