我是Python Flask RESTful API的新手 . 现在,我正在开发的项目是由以前的开发人员预先构建的 . 我能够在这个项目中加入更多的逻辑 . 现在,随着要求的发展,我需要使用CELERY进行大量计算 . 我浏览了WEB中的不同文章(例如https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern)和其他文章,但仍然没有运气 .
[repo: tracker/]
__init.py__
app.py
config.py
models.py
celery.py
tasks.py
/resources/locate.py
/resources/create.py
init .py - 具有以下内容:
from tracker import app
app.py - 具有以下内容:
from tracker.resources.locate import Locate
from tracker.resources.create import Create
from .celery import create_celery
from .redis_repo import redis_store
from .config import app_config
from .models import db
import collections
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(app_config[config_name])
api = Api(app, catch_all_404s=True)
api.add_resource(Locate, '/api/v1/locate/<string:ud>', methods=['GET'])
api.add_resource(Create, '/api/v1/create/<string:ud>', methods=['GET','POST'])
redis_store.init_app(app)
db.init_app(app)
return app
app = create_app('development')
if __name__ == '__main__':
app.run(threaded=True, debug=False)
config.py - 具有以下内容:
import tempfile
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_POOL_SIZE = 10
SQLALCHEMY_DATABASE_URI = 'postgres://xxx'
REDIS_URL = "redis://127.0.0.1:32769/0"
CELERY_BROKER_URL = "redis://127.0.0.1:32769/0"
CELERY_BACKEND = "db+postgresql://xxx"
app_config = {'development': DevelopmentConfig}
models.py - 具有以下内容:
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func, exc, or_
import datetime
from sqlalchemy_utils import UUIDType
import tracker
db = SQLAlchemy()
class T_Logs(db.Model)
....
....
....
locate.py - 具有以下内容:
from flask import request, current_app, jsonify, after_this_request
from flask_restful import Resource
from sqlalchemy import exc
from tracker import models, config, redis_repo, utility
from datetime import datetime, timedelta
import uuid, json
class Locate(Resource):
def get(self, ud):
....
....
使用这些文档或教程,可以很容易地运行并理解它们是如何工作的 . 但在我们的实现中完全放入它似乎不起作用 .
用celery.py
from celery import Celery
def create_celery(app=None):
application = app or create_app
celery = Celery(app.import_name,
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with application.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
并示例tasks.py
from app import create_celery
@celery.task(name="tasks.add")
def add(a,b):
return a+ b
因此,当我通过visual studio代码运行这些代码时,我没有看到任何错误或警告 . 但是当我在另一个命令提示符下运行它来启动worker时:
celery -A app.tasks.add worker -l info -P eventlet
它来自tracker.resources.locate导入定位ModuleNotFoundError:没有名为'tracker'的模块
它永远不会被执行 . 我还没有尝试在资源上调用任务,但我还是不能理解如何做到这一点 .
如果有人可以用这个来启发我,我将不胜感激 .
1 回答
我已经设法解决了这个问题,似乎问题是在绝对的道路上 .
所以做了一些相关的改变,如:
与其他进口声明一样 .
现在我的下一个问题是,如何调用任务让我们说在GET方法中定位路径 .
如果有任何想法,请告诉我 .