我们使用Django,Celery和RabbitMQ来处理各种自动化工作者 . 为了监控所有这些,我们有一个用于分析和记录的兔子队列 . 这意味着每个自动化任务都应该能够将任务发送到分析队列 . 我们用“./manage.py芹菜 Worker ”运行 Worker ,我们的经纪人连接工作正常 . 工作人员可以完成所需的任务并完美地执行它,但在执行期间

log_event.delay()

突然它无法连接 . 通过挖掘amqp库和一些pdb.set_trace(),我们发现在任务执行期间,它试图连接到localhost而不是我们的rabbitmq服务器 . 以下是相关的经纪人设置:

local_settings.py

BROKER_URL = 'amqp://guest@localhost:5672/'

celeryconfig.py

import os
from kombu import Queue
from celery import Celery
from . import local_settings


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sample.settings.production')
app = Celery('api')

BROKER_URL = getattr(local_settings, 'BROKER_URL', 'amqp://guest@broker/')
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_DEFAULT_QUEUE = 'default'

CELERY_TIMEZONE = 'Europe/London'
CELERY_ENABLE_UTC = True

CELERY_QUEUES = (
    Queue('default', routing_key='default'),
    Queue('queue1', routing_key='queue1'),
    Queue('queue2', routing_key='queue2'),
    Queue('queue3', routing_key='queue3'),
    Queue('queue4', routing_key='queue4'),
)

CELERY_ANNOTATIONS = {
    'appname.tasks.queue1_task': {'rate_limit': '200/m'},
    'appname.tasks.queue2_task': {'rate_limit': '200/m'},
    'appname.tasks.queue3_task': {'rate_limit': '200/m'},
    'appname.tasks.queue4_task': {'rate_limit': '200/m'},
}

CELERYD_CONCURRENCY = 10

CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'


CELERY_ROUTES = {
    'appname.tasks.queue1': {
        'queue': 'queue1',
        'routing_key': 'queue1',
    },
    'appname.tasks.queue2': {
        'queue': 'queue2',
        'routing_key': 'queue2',
    },
    'appname.tasks.queue3': {
        'queue': 'queue3',
        'routing_key': 'queue3',
    },
    'appname.tasks.queue4': {
        'queue': 'queue4',
        'routing_key': 'queue4',
    },
}

CELERY_IGNORE_RESULT = True

tasks.py

from celery.task import task #We also tried shared_task, no success
@task(time_limit=120, queue="queue1")
def log_action_to_es(cls, analytic_dict):
    pass # Never gets this far