与问题的连续性:celery periodic tasks not executing
我用芹菜设置了django:
drf_project / drf_project / celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'drf_project.settings')
app = Celery('drf_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
在drf_project / drf_project / init.py中
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
drf_project / user_management / tasks.py
from drf_project.celery import app
from time import strftime
@app.task
def print_test():
print strftime('%Y-%m-%d %H:%M:%S')
with open('abc.txt', 'ab+') as test_file:
test_file.writeline(strftime('%Y-%m-%d %H:%M:%S'))
在drf_project / drf_project / settings.py中
INSTALLED_APPS += ('django_celery_beat',)
CELERYBEAT_SCHEDULE = {
"test_1": {
"task": "tasks.print_test",
"schedule": timedelta(seconds=2),
},
}
我跑了两个芹菜 Worker 并用命令在不同的终端击败:
celery -A drf_project worker -l info -E
和
celery -A drf_project beat -l info -S django
节拍每2秒向 Worker 发送一次任务,如:
[2016-11-28 12:25:19,314:INFO / MainProcess]调度程序:发送到期任务导入联系人(print_test)
但 Worker 抛出的错误如下:
[2016-11-28 12:24:57,551:ERROR / MainProcess]收到类型为'print_test'的未注册任务 . 该消息已被忽略并被丢弃 . 你还记得导入包含这个任务的模块吗?或者你可能正在使用相对进口?有关更多信息,请参阅http://docs.celeryq.org/en/latest/internals/protocol.html . 消息体的全部内容是:u'[[],{},{“chord”:null,“callbacks”:null,“errbacks”:null,“chain”:null}]'(77b)Traceback(最近一次调用last):文件“/usr/local/lib/python2.7/dist-packages/celery/worker/consumer/consumer.py”,第549行,在on_task_received策略=策略[type_] KeyError:'print_test'
我可以在我的工作者应用程序中看到这样的任务:
[tasks]
. user_management.tasks.print_test
如何解决这个错误?
1 回答
尝试在CELERYBEAT_SCHEDULE中设置任务的绝对路径“user_management.tasks.print_test”
它应该如下所示: