欢迎...我'm creating a project where I parse xlsx files with xlrd library. Everything works just fine. Then I configured RabbitMQ and Celery. Created some tasks in main folder which works and can be accessed from iPython. The problems starts when I' m在我的应用程序中(在我的项目中及时创建的应用程序)我尝试从我的应用程序中导入任务在我的views.py中我试图用所有可能的路径导入它但每次它都会抛出一个错误 . 官方文档发布了从其他应用程序导入任务的正确方法,它看起来像这样: from project.myapp.tasks import mytask
但它根本不起作用 . 此外,当我在iPython中我可以使用命令 from tango.tasks import add
导入任务并且它完美地工作 .
只是吼我正在上传我的文件和错误由控制台打印出来 .
views.py
# these are the instances that I was trying to import that seemed to be the most reasonable, but non of it worked
# import tasks
# from new_tango_project.tango.tasks import add
# from new_tango_project.tango import tasks
# from new_tango_project.new_tango_project.tango.tasks import add
# from new_tango_project.new_tango_project.tango import tasks
# from tango import tasks
#function to parse files
def parse_file(request, file_id):
xlrd_file = get_object_or_404(xlrdFile, pk = file_id)
if xlrd_file.status == False
#this is some basic task that I want to enter to
tasks.add.delay(321,123)
settings.py
#I've just posted things directly connected to celery
import djcelery
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'tango',
'djcelery',
'celery',
)
BROKER_URL = "amqp://sebrabbit:seb@localhost:5672/myvhost"
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672
BROKER_VHOST = "myvhost"
BROKER_USER = "sebrabbit"
BROKER_PASSWORD = "seb"
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Warsaw'
CELERY_ENABLE_UTC = False
celery.py(在我的主文件夹 new_tango_project
中)
from __future__ import absolute_import
import os
from celery import Celery
import djcelery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'new_tango_project.settings')
app = Celery('new_tango_project')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# CELERY_IMPORTS = ['tango.tasks']
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend',
)
if __name__ == '__main__':
app.start()
tasks.py(在我的主项目文件夹 new_tango_project
中)
from __future__ import absolute_import
from celery import Celery
from celery.task import task
app = Celery('new_tango_project',
broker='amqp://sebrabbit:seb@localhost:5672/myvhost',
backend='amqp://',
include=['tasks'])
@task
def add(x, y):
return x + y
@task
def mul(x, y):
return x * y
@task
def xsum(numbers):
return sum(numbers)
@task
def parse(file_id, xlrd_file):
return "HAHAHAHHHAHHA"
在我的应用程序文件夹中的tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.task import task
#
app = Celery('tango')
@task
def add(x, y):
return x + y
@task
def asdasdasd(x, y):
return x + y
芹菜控制台启动时
-------------- celery@debian v3.1.17 (Cipater)
---- **** -----
--- * *** * -- Linux-3.2.0-4-amd64-x86_64-with-debian-7.8
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: new_tango_project:0x1b746d0
- ** ---------- .> transport: amqp://sebrabbit:**@localhost:5672/myvhost
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
最后我的控制台日志......
[2015-02-20 11:19:45,678: ERROR/MainProcess] Received unregistered task of type 'new_tango_project.tasks.add'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{'utc': True, 'chord': None, 'args': (123123123, 123213213), 'retries': 0, 'expires': None, 'task': 'new_tango_project.tasks.add', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'd9a8e560-1cd0-491d-a132-10345a04f391'} (233b)
Traceback (most recent call last):
File "/home/seb/PycharmProjects/tango/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 455, in on_task_received
strategies[name](message, body,
KeyError: 'new_tango_project.tasks.add'
这是导入任务的众多尝试之一的日志 .
我在哪里弄错了?
最好的祝愿
2 回答
Hint 1: 在您的所有
tasks.py
中,您将Celery应用程序声明为app = Celery(...)
但是您没有指定任务应该在任务装饰器中附加到哪个应用程序 .尝试将
@task
更改为@app.task
并查看是否有效 .Hint 2: 为什么每个
tasks.py
都需要创建一个新的Celery应用程序?为什么不用from new_tango_project.celery import app
导入一个主Celery应用程序然后用@app.task
声明你的任务?Hint 3: 一旦定义了任务(可能在应用程序的
celery.py
和tasks.py
中),只需执行我想知道你是如何开始你的芹菜 Worker 的 . 我遇到过这一次,因为我没有正确启动工作人员:执行“celery worker -l info”时应添加-A选项,以便芹菜将连接到您在Celery Obj中配置的代理 . 否则芹菜将尝试连接默认代理 .