首页 文章

将芹菜任务路由到特定队列

提问于
浏览
5

我在服务器上运行了两个独立的celeryd进程,由 supervisor 管理 . 它们被设置为侦听单独的队列:

[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...

[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...

我的celeryconfig看起来像这样:

from celery.schedules import crontab

BROKER_URL = "amqp://guest:guest@localhost:5672//"

CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        "exchange": "default",
        "binding_key": "default",
    },
    'queue1': {
        'exchange': 'queue1',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2',
        'routing_key': 'queue2',
    },
}

CELERY_IMPORTS = ('tasks', )

CELERYBEAT_SCHEDULE = {
    'first-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_1'},
        'options': {'queue': 'queue1'},
    },
    'second-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_2'},
        'options': {'queue': 'queue1'},
    },
}

必须将所有 tasks.sync 任务路由到特定队列(因此芹菜进度) . 但当我尝试用 sync.apply_async(kwargs={'client': 'value'}, queue='queue1') 手动运行任务时,两个芹菜 Worker 都接受了这个任务 . 如何使任务路由到正确的队列,并且只能由绑定到队列的worker运行?

1 回答

  • 6

    你只运行一个celerybeat实例吗?

    也许你有旧的队列绑定与此冲突?尝试运行 rabbitmqctl list_queuesrabbitmqctl list_bindings ,可能会重置代理中的数据以从头开始 .

    你在这里的例子应该有用,并且在我试用它时正在为我工作 .

    提示:由于您使用的是与队列名相同的exchange和binding_key值,因此您无需在CELERY_QUEUES中明确列出它们 . 当CELERY_CREATE_MISSING_QUEUES打开时(默认情况下),如果您只是执行 celeryd -Q queue1 或将任务发送到未定义的队列,则将自动创建队列 .

相关问题