我正在使用芹菜3.0.20,我很难让芹菜任务与不同的队列一起工作 .
我的芹菜 Worker 使用队列配置启动
-Q:w1 default -Q:w2 longrunning
Django设置包含以下celery配置:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('longrunning', Exchange('longrunning'), routing_key='longrunning'),
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_ROUTES = ({'mytasks.task_a.run': {
'queue': 'longrunning',
'routing_key': 'longrunning'
}},
到现在为止还挺好 . 默认情况下,所有任务都在 default
队列中, task_a
进入长时间运行的队列 .
task_a
模块实现如下:
from celery import task
@task
def run():
# do some task work
现在,我遇到的问题是,一个不同的任务被实现为继承自 celery.Task
的类:
from celery import Task
class AnotherTask(Task):
def run(self, *args, **kwargs):
# do some task work
当 AnotherTask
类现在位于 task_b
模块中时,我无法在 longrunning
队列中执行此任务:我尝试将其添加到 CELERY_ROUTES
中,不同的类别,没有一个工作:
{'mytasks.task_b': {
'queue': 'longrunning',
'routing_key': 'longrunning'
}}
{'mytasks.task_b.AnotherTask': {
'queue': 'longrunning',
'routing_key': 'longrunning'
}}
{'mytasks.task_b.AnotherTask.run': {
'queue': 'longrunning',
'routing_key': 'longrunning'
}}
我也尝试切换到默认交换类型 'topic'
,但这也不起作用 .
任何提示如何让 AnotherTask
类中的任务在 longrunning
队列中执行?
1 回答
好的,我发现了问题 .
这是因为所有其他任务最终都在
longrunning
队列中,并且定期与celerybeat一起开始 . 新任务是从Web环境开始的 . 并且celerybeat设置导入了包含CELERY_ROUTES
dict的设置,但Web环境设置没有 . 啊!