首页 文章

带有create_app,SQLAlchemy和Celery的Flask

提问于
浏览
16

我真的很难为Flask,SQLAlchemy和Celery进行适当的设置 . 我已经广泛搜索并尝试了不同的方法,似乎没有什么工作 . 要么我错过了应用程序上下文,要么无法运行工作程序,或者存在其他一些问题 . 结构非常通用,因此我可以构建更大的应用程序 .

我正在使用:Flask 0.10.1,SQLAlchemy 1.0,Celery 3.1.13,我目前的设置如下:

应用程序/ __ init__.py

#Empty

应用程序/ config.py

import os
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:

    @staticmethod
    def init_app(app):
        pass

class LocalConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir, 
                                 "data-dev.sqlite")
    CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'


config = {
    "local": LocalConfig}

应用程序/ exstensions.py

from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

应用程序/ factory.py

from extensions import db, celery
from flask import Flask
from flask import g
from config import config

def create_before_request(app):
    def before_request():
        g.db = db
    return before_request


def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    db.init_app(app)
    celery.config_from_object(config)

    # Register the blueprints

    # Add the before request handler
    app.before_request(create_before_request(app))
    return app

应用程序/ manage.py

from factory import create_app

app = create_app("local")

from flask import render_template
from flask import request

@app.route('/test', methods=['POST'])
def task_simple():
    import tasks
    tasks.do_some_stuff.delay()
    return ""

if __name__ == "__main__":
    app.run()

应用程序/ models.py

from extensions import db

class User(db.Model):
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(128), unique=True, nullable=False)

应用程序/ tasks.py

from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app


@task_prerun.connect
def close_session(*args, **kwargs):
    with current_app.app_context():
       # use g.db
       print g

@celery.task()
def do_some_stuff():
    with current_app.app_context():
       # use g.db
       print g

在文件夹应用程序中:

  • 启动开发网络服务器: python.exe manage.py

  • 启动 Worker : celery.exe worker -A tasks

我收到一个对我没有任何意义的导入错误 . 我应该以不同方式构建应用程序最后,我想我想要一个非常基本的设置,例如将Flask与工厂模式一起使用,可以使用Flask-SQLAlchmey扩展并让一些需要访问数据库的worker .

任何帮助都非常感谢 .

启动芹菜 Worker 时执行回溯 .

Traceback (most recent call last):

  File "[PATH]\scripts\celery-script.py", line 9, in <module>
    load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()

  File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main
    main()

  File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main
    cmd.execute_from_commandline(argv)

  File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))

  File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline
    argv = self.setup_app_from_commandline(argv)

  File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline
    user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'

UPDATE 我根据评论中的建议更改了代码 . 工作人员现在启动但是在使用 http://127.0.0.1:5000/test 的get请求进行测试时 . 我得到以下回溯:

Traceback (most recent call last):
  File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task
    args=args, kwargs=kwargs)

  File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send
    response = receiver(signal=self, sender=sender, \**named)

  File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session
    with current_app.app_context():

  File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__
    return getattr(self._get_current_object(), name)

  File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object
    return self.__local()

  File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app
    raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))

UPDATE 根据Marteen的评论,我改变了代码 . 当前的工作版本位于:https://gist.github.com/anonymous/fa47834db2f4f3b8b257 . 欢迎任何进一步的改进或建议 .

2 回答

  • 14

    我关注了current_app的建议 .

    您的celery对象需要访问应用程序上下文 . 我在网上找到了一些关于使用工厂函数创建Celery对象的信息 . 在没有消息代理的情况下测试以下示例 .

    #factory.py
    from celery import Celery
    from config import config
    
    def create_celery_app(app=None):
        app = app or create_app(config)
        celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
        celery.conf.update(app.config)
        TaskBase = celery.Task
    
        class ContextTask(TaskBase):
            abstract = True
    
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
    
        celery.Task = ContextTask
        return celery
    

    并在tasks.py中:

    #tasks.py
    from factory import create_celery_app
    from celery.signals import task_prerun
    from flask import g
    
    celery = create_celery_app()
    
    @task_prerun.connect
    def celery_prerun(*args, **kwargs):
        #print g
        with celery.app.app_context():
        #   # use g.db
           print g
    
    @celery.task()
    def do_some_stuff():
        with celery.app.app_context():
            # use g.db
            g.user = "test"
            print g.user
    

    一些链接:

    Flask pattern for creating a Celery instance with factory function

    Application using both application factory and celery

    Source for said application's factory.py

    Source for application tasks.py

  • 2

    这是一个解决方案,它适用于烧瓶应用工厂模式,并且还可以使用上下文创建芹菜任务,而无需在任务中明确使用 app.app_context() . 在我的应用程序中,获取该app对象同时避免循环导入是非常棘手的,但这解决了它 . 在撰写本文时,这也适用于最新的芹菜版本4.2 .

    结构体:

    repo_name/
        manage.py
        base/
        base/__init__.py
        base/app.py
        base/runcelery.py
        base/celeryconfig.py
        base/utility/celery_util.py
        base/tasks/workers.py
    

    所以 base 是本例中的主要应用程序包 . 在 base/__init__.py 中我们创建芹菜实例如下:

    from celery import Celery
    celery = Celery('base', config_source='base.celeryconfig')
    

    base/app.py 文件包含烧瓶应用工厂 create_app 并注意它包含的 init_celery(app, celery)

    from base import celery
    from base.utility.celery_util import init_celery
    
    def create_app(config_obj):
        """An application factory, as explained here:
        http://flask.pocoo.org/docs/patterns/appfactories/.
        :param config_object: The configuration object to use.
        """
        app = Flask('base')
        app.config.from_object(config_obj)
        init_celery(app, celery=celery)
        register_extensions(app)
        register_blueprints(app)
        register_errorhandlers(app)
        register_app_context_processors(app)
        return app
    

    继续 base/runcelery.py 内容:

    from flask.helpers import get_debug_flag
    from base.settings import DevConfig, ProdConfig
    from base import celery
    from base.app import create_app
    from base.utility.celery_util import init_celery
    CONFIG = DevConfig if get_debug_flag() else ProdConfig
    app = create_app(CONFIG)
    init_celery(app, celery)
    

    接下来, base/celeryconfig.py 文件(作为示例):

    # -*- coding: utf-8 -*-
    """
    Configure Celery. See the configuration guide at ->
    http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
    """
    
    ## Broker settings.
    broker_url = 'pyamqp://guest:guest@localhost:5672//'
    broker_heartbeat=0
    
    # List of modules to import when the Celery worker starts.
    imports = ('base.tasks.workers',)
    
    ## Using the database to store task state and results.
    result_backend = 'rpc'
    #result_persistent = False
    
    accept_content = ['json', 'application/text']
    
    result_serializer = 'json'
    timezone = "UTC"
    
    # define periodic tasks / cron here
    # beat_schedule = {
    #    'add-every-10-seconds': {
    #        'task': 'workers.add_together',
    #        'schedule': 10.0,
    #        'args': (16, 16)
    #    },
    # }
    

    现在在 base/utility/celery_util.py 文件中定义init_celery:

    # -*- coding: utf-8 -*-
    
    def init_celery(app, celery):
        """Add flask app context to celery.Task"""
        TaskBase = celery.Task
        class ContextTask(TaskBase):
            abstract = True
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
        celery.Task = ContextTask
    

    对于 base/tasks/workers.py 中的 Worker :

    from base import celery as celery_app
    from flask_security.utils import config_value, send_mail
    from base.bp.users.models.user_models import User
    
    @celery_app.task
    def send_welcome_email(email, user_id, confirmation_link):
        """Background task to send a welcome email with flask-security's mail.
        You don't need to use with app.app_context() as Task has app context.
        """
        user = User.query.filter_by(id=user_id).first()
        print(f'sending user {user} a welcome email')
        send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
                  email,
                  'welcome', user=user,
                  confirmation_link=confirmation_link) 
    
    @celery_app.task
    def do_some_stuff():
        print(g)
    

    然后,您需要从 repo_name 文件夹内的两个不同的cmd提示中启动芹菜殴打和芹菜工作者 .

    在一个cmd提示符下执行 celery -A base.runcelery:celery beat ,另一个 celery -A base.runcelery:celery worker .

    然后,执行需要烧瓶上下文的任务 . 应该管用 .

相关问题