我真的很难为Flask,SQLAlchemy和Celery进行适当的设置 . 我已经广泛搜索并尝试了不同的方法,似乎没有什么工作 . 要么我错过了应用程序上下文,要么无法运行工作程序,或者存在其他一些问题 . 结构非常通用,因此我可以构建更大的应用程序 .
我正在使用:Flask 0.10.1,SQLAlchemy 1.0,Celery 3.1.13,我目前的设置如下:
应用程序/ __ init__.py
#Empty
应用程序/ config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
@staticmethod
def init_app(app):
pass
class LocalConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir,
"data-dev.sqlite")
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
config = {
"local": LocalConfig}
应用程序/ exstensions.py
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
应用程序/ factory.py
from extensions import db, celery
from flask import Flask
from flask import g
from config import config
def create_before_request(app):
def before_request():
g.db = db
return before_request
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
celery.config_from_object(config)
# Register the blueprints
# Add the before request handler
app.before_request(create_before_request(app))
return app
应用程序/ manage.py
from factory import create_app
app = create_app("local")
from flask import render_template
from flask import request
@app.route('/test', methods=['POST'])
def task_simple():
import tasks
tasks.do_some_stuff.delay()
return ""
if __name__ == "__main__":
app.run()
应用程序/ models.py
from extensions import db
class User(db.Model):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(128), unique=True, nullable=False)
应用程序/ tasks.py
from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app
@task_prerun.connect
def close_session(*args, **kwargs):
with current_app.app_context():
# use g.db
print g
@celery.task()
def do_some_stuff():
with current_app.app_context():
# use g.db
print g
在文件夹应用程序中:
-
启动开发网络服务器:
python.exe manage.py
-
启动 Worker :
celery.exe worker -A tasks
我收到一个对我没有任何意义的导入错误 . 我应该以不同方式构建应用程序最后,我想我想要一个非常基本的设置,例如将Flask与工厂模式一起使用,可以使用Flask-SQLAlchmey扩展并让一些需要访问数据库的worker .
任何帮助都非常感谢 .
启动芹菜 Worker 时执行回溯 .
Traceback (most recent call last):
File "[PATH]\scripts\celery-script.py", line 9, in <module>
load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()
File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main
main()
File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline
user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'
UPDATE 我根据评论中的建议更改了代码 . 工作人员现在启动但是在使用 http://127.0.0.1:5000/test
的get请求进行测试时 . 我得到以下回溯:
Traceback (most recent call last):
File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task
args=args, kwargs=kwargs)
File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send
response = receiver(signal=self, sender=sender, \**named)
File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session
with current_app.app_context():
File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__
return getattr(self._get_current_object(), name)
File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object
return self.__local()
File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))
UPDATE 根据Marteen的评论,我改变了代码 . 当前的工作版本位于:https://gist.github.com/anonymous/fa47834db2f4f3b8b257 . 欢迎任何进一步的改进或建议 .
2 回答
我关注了current_app的建议 .
您的celery对象需要访问应用程序上下文 . 我在网上找到了一些关于使用工厂函数创建Celery对象的信息 . 在没有消息代理的情况下测试以下示例 .
并在tasks.py中:
一些链接:
Flask pattern for creating a Celery instance with factory function
Application using both application factory and celery
Source for said application's factory.py
Source for application tasks.py
这是一个解决方案,它适用于烧瓶应用工厂模式,并且还可以使用上下文创建芹菜任务,而无需在任务中明确使用
app.app_context()
. 在我的应用程序中,获取该app对象同时避免循环导入是非常棘手的,但这解决了它 . 在撰写本文时,这也适用于最新的芹菜版本4.2 .结构体:
所以
base
是本例中的主要应用程序包 . 在base/__init__.py
中我们创建芹菜实例如下:base/app.py
文件包含烧瓶应用工厂create_app
并注意它包含的init_celery(app, celery)
:继续
base/runcelery.py
内容:接下来,
base/celeryconfig.py
文件(作为示例):现在在
base/utility/celery_util.py
文件中定义init_celery:对于
base/tasks/workers.py
中的 Worker :然后,您需要从
repo_name
文件夹内的两个不同的cmd提示中启动芹菜殴打和芹菜工作者 .在一个cmd提示符下执行
celery -A base.runcelery:celery beat
,另一个celery -A base.runcelery:celery worker
.然后,执行需要烧瓶上下文的任务 . 应该管用 .