我在服务器A上使用Celery和Rabbitmq代理 . 有些任务需要与另一台服务器交互,比如服务器B和我正在使用Rabbitmq队列进行此交互 .

Queue 1 - 服务器A( 生产环境 者),服务器B(消费者)

Queue 2 - 服务器B( 生产环境 者),服务器A(消费者)

我的芹菜意外地挂了,我发现了服务器A消费者代码的错误实现的原因 .

channel.start_consuming() 按预期继续轮询Rabbitmq,但是将其放入芹菜任务会创建多个不会过期的轮询器 . 我可以添加到期但是无法保证 data 发送到服务器B的时间完成 . 下面粘贴的代码是我用来解决这个问题的一种方法,但我不相信这是最好的解决方案 .

我想知道我做错了什么,以及实现这个的正确方法是什么,因为我没有在网上搜索文章 . 任何提示,见解甚至文章链接都将非常有帮助 .

最后,我的代码 -

@celery.task
def task_a(data):
    do_some_processing
    # Create only 1 Rabbitmq consumer instance to avoid celery hangups
    task_d.delay()

@celery.task
def task_b(data):
    do_some_processing
    if data is not None:
        task_c.delay()

@celery.task
def task_c():
    data = some_data
    data = json.dumps(data)
    conn_params = pika.ConnectionParameters(host=RABBITMQ_HOST)
    connection = pika.BlockingConnection(conn_params)
    channel = connection.channel()
    channel.queue_declare(queue=QUEUE_1)
    channel.basic_publish(exchange='',
                        routing_key=QUEUE_1,
                        body=data)
    channel.close()

@celery.task
def task_d():
    def queue_helper(ch, method, properties, body):
        '''
        Callback from queue.
        '''
        data = json.loads(body)
        task_b.delay(data)

    conn_params = pika.ConnectionParameters(host=RABBITMQ_HOST)
    connection = pika.BlockingConnection(conn_params)
    channel = connection.channel()
    channel.queue_declare(queue=QUEUE_2)
    channel.basic_consume(queue_helper,
                        queue=QUEUE_2,
                        no_ack=True)
    channel.start_consuming()
    channel.close()