首页 文章

Python django rabbitmq celery导入任务的问题

提问于
浏览
2

欢迎...我'm creating a project where I parse xlsx files with xlrd library. Everything works just fine. Then I configured RabbitMQ and Celery. Created some tasks in main folder which works and can be accessed from iPython. The problems starts when I' m在我的应用程序中(在我的项目中及时创建的应用程序)我尝试从我的应用程序中导入任务在我的views.py中我试图用所有可能的路径导入它但每次它都会抛出一个错误 . 官方文档发布了从其他应用程序导入任务的正确方法,它看起来像这样: from project.myapp.tasks import mytask 但它根本不起作用 . 此外,当我在iPython中我可以使用命令 from tango.tasks import add 导入任务并且它完美地工作 .

只是吼我正在上传我的文件和错误由控制台打印出来 .

views.py

# these are the instances that I was trying to import that seemed to be the most reasonable, but non of it worked
# import tasks
# from new_tango_project.tango.tasks import add
# from new_tango_project.tango import tasks
# from new_tango_project.new_tango_project.tango.tasks import add
# from new_tango_project.new_tango_project.tango import tasks
# from tango import tasks

#function to parse files
def parse_file(request, file_id):
    xlrd_file = get_object_or_404(xlrdFile, pk = file_id)
    if xlrd_file.status == False
        #this is some basic task that I want to enter to
        tasks.add.delay(321,123)

settings.py

#I've just posted things directly connected to celery
import djcelery
INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'tango',
    'djcelery',
    'celery',
)

BROKER_URL = "amqp://sebrabbit:seb@localhost:5672/myvhost"
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672
BROKER_VHOST = "myvhost"
BROKER_USER = "sebrabbit"
BROKER_PASSWORD = "seb"
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Warsaw'
CELERY_ENABLE_UTC = False

celery.py(在我的主文件夹 new_tango_project 中)

from __future__ import absolute_import
import os
from celery import Celery
import djcelery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'new_tango_project.settings')

app = Celery('new_tango_project')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# CELERY_IMPORTS = ['tango.tasks']
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend',
)

if __name__ == '__main__':
    app.start()

tasks.py(在我的主项目文件夹 new_tango_project 中)

from __future__ import absolute_import
from celery import Celery
from celery.task import task

app = Celery('new_tango_project',
             broker='amqp://sebrabbit:seb@localhost:5672/myvhost',
             backend='amqp://',
             include=['tasks'])

@task
def add(x, y):
    return x + y


@task
def mul(x, y):
    return x * y


@task
def xsum(numbers):
    return sum(numbers)

@task
def parse(file_id, xlrd_file):

    return "HAHAHAHHHAHHA"

在我的应用程序文件夹中的tasks.py

from __future__ import absolute_import
from celery import Celery
from celery.task import task    
#
app = Celery('tango')

@task
def add(x, y):
    return x + y

@task
def asdasdasd(x, y):
    return x + y

芹菜控制台启动时

-------------- celery@debian v3.1.17 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.2.0-4-amd64-x86_64-with-debian-7.8
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         new_tango_project:0x1b746d0
- ** ---------- .> transport:   amqp://sebrabbit:**@localhost:5672/myvhost
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

最后我的控制台日志......

[2015-02-20 11:19:45,678: ERROR/MainProcess] Received unregistered task of type 'new_tango_project.tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': (123123123, 123213213), 'retries': 0, 'expires': None, 'task': 'new_tango_project.tasks.add', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'd9a8e560-1cd0-491d-a132-10345a04f391'} (233b)
Traceback (most recent call last):
  File "/home/seb/PycharmProjects/tango/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 455, in on_task_received
    strategies[name](message, body,
KeyError: 'new_tango_project.tasks.add'

这是导入任务的众多尝试之一的日志 .

我在哪里弄错了?

最好的祝愿

2 回答

  • 0

    Hint 1: 在您的所有 tasks.py 中,您将Celery应用程序声明为 app = Celery(...) 但是您没有指定任务应该在任务装饰器中附加到哪个应用程序 .

    尝试将 @task 更改为 @app.task 并查看是否有效 .

    Hint 2: 为什么每个 tasks.py 都需要创建一个新的Celery应用程序?为什么不用 from new_tango_project.celery import app 导入一个主Celery应用程序然后用 @app.task 声明你的任务?

    Hint 3: 一旦定义了任务(可能在应用程序的 celery.pytasks.py 中),只需执行

    from new_tango_project.celery import add
    from my_app.tasks import add_bis
    
    def my_view(request):
        ...
        add.delay(*your_params)   # using the task from your celery.py
        add_bis.delay(*your_params) # your task from the application
    
  • 0

    我想知道你是如何开始你的芹菜 Worker 的 . 我遇到过这一次,因为我没有正确启动工作人员:执行“celery worker -l info”时应添加-A选项,以便芹菜将连接到您在Celery Obj中配置的代理 . 否则芹菜将尝试连接默认代理 .

相关问题