我正在开发一个应用程序,我必须定期从位于我的项目目录中的celery.py文件中调用myapp / task.py中的函数 . 但是当我从celery.py文件中执行import语句时
from myapp.tasks import test
我收到以下错误,我无法解决:
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
以下是init.py中的代码:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
来自celery.py的代码
from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
from myapp.tasks import test
os.environ.setdefault('DJANGO_SETTINGS_MODULE','project.settings')app = Celery('project')
app.config_from_object( 'django.conf:设置',命名空间= '芹菜')
app.autodiscover_tasks()
@ app.on_after_configure.connect
def setup_monitor_call(发件人,** kwargs):
sender.add_periodic_task(30.0,test.s(),name='call test every 30 seconds')
现在在我的app目录中有一个名为tasks.py的文件,其中包含我从celery.py调用的测试方法:
from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from celery import shared_task
from celery.utils.log import get_task_logger
logger=get_task_logger(__name__)
def test():
print("Hello Celery")
现在确切的问题是,当我在celery.py文件中编写以下import语句时:
from myapp.tasks import test
然后当我运行python manage.py runserver时,我得到错误:
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
我删除import语句的那一刻,服务器正确运行 . 我弄不清楚为什么?
1 回答
此错误是由于与该导入不兼容,例如循环导入 . 我最近与芹菜合作开展了一个项目,有时进口也像纸牌屋一样脆弱 .
我建议您在此示例中按如下方式调用任务: