首页 文章

Flask与Celery集成

提问于
浏览
7

我想在我的Flask Example application中使用芹菜 . 因为我在Factory方法中创建实例,所以我不能使用文档中的示例(http://flask.pocoo.org/docs/0.10/patterns/celery/

init.py

from celery import Celery
from flask import Flask
from config import config


def create_app():
    app = Flask(__name__)
    app.debug = True
    app.config.from_object(config)

    from .main import main as main_blueprint
    app.register_blueprint(main_blueprint)


    return app

def make_celery(app = None):
    app = app or create_app()
    celery = Celery('app', backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

tasks.py

from app import make_celery

celery = make_celery()


@celery.task
def add(a, b):
    return a + b

views.py

from flask import render_template
from app.main import main
from ..tasks import add

@main.route('/', methods=['GET', 'POST'])
def index():
    add.delay(5, 3)
    return render_template('index.html')

我得到了error

$  celery -A app.tasks worker
Traceback (most recent call last):
  File "...lib/python3.4/site-packages/celery/app/utils.py", line 229, in find_app
    sym = symbol_by_name(app, imp=imp)
  File "...lib/python3.4/site-packages/celery/bin/base.py", line 488, in symbol_by_name
    return symbol_by_name(name, imp=imp)
  File "...lib/python3.4/site-packages/kombu/utils/__init__.py", line 97, in symbol_by_name
    return getattr(module, cls_name) if cls_name else module
AttributeError: 'module' object has no attribute 'tasks'

1 回答

相关问题