首页 文章

使用Celery问题Flask RESTful create_app

提问于
浏览
0

我是Python Flask RESTful API的新手 . 现在,我正在开发的项目是由以前的开发人员预先构建的 . 我能够在这个项目中加入更多的逻辑 . 现在,随着要求的发展,我需要使用CELERY进行大量计算 . 我浏览了WEB中的不同文章(例如https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern)和其他文章,但仍然没有运气 .

[repo: tracker/]
   __init.py__
   app.py
   config.py
   models.py
   celery.py
   tasks.py
   /resources/locate.py
   /resources/create.py

init .py - 具有以下内容:

from tracker import app

app.py - 具有以下内容:

from tracker.resources.locate import Locate
from tracker.resources.create import Create

from .celery import create_celery
from .redis_repo import redis_store
from .config import app_config
from .models import db

import collections

def create_app(config_name):

    app = Flask(__name__)
    app.config.from_object(app_config[config_name])

    api = Api(app, catch_all_404s=True)
    api.add_resource(Locate, '/api/v1/locate/<string:ud>', methods=['GET'])
    api.add_resource(Create, '/api/v1/create/<string:ud>', methods=['GET','POST'])

    redis_store.init_app(app)
    db.init_app(app)

       return app

    app = create_app('development')

    if __name__ == '__main__':
        app.run(threaded=True, debug=False)

config.py - 具有以下内容:

import tempfile

class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_POOL_SIZE = 10
    SQLALCHEMY_DATABASE_URI = 'postgres://xxx'
    REDIS_URL = "redis://127.0.0.1:32769/0" 
    CELERY_BROKER_URL = "redis://127.0.0.1:32769/0"
    CELERY_BACKEND = "db+postgresql://xxx" 

app_config = {'development': DevelopmentConfig}

models.py - 具有以下内容:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func, exc, or_
import datetime
from sqlalchemy_utils import UUIDType

import tracker

db = SQLAlchemy()

class T_Logs(db.Model)
      ....
      ....
      ....

locate.py - 具有以下内容:

from flask import request, current_app, jsonify, after_this_request
from flask_restful import Resource
from sqlalchemy import exc

from tracker import models, config, redis_repo, utility
from datetime import datetime, timedelta

import uuid, json

class Locate(Resource):

    def get(self, ud):
    ....
    ....

使用这些文档或教程,可以很容易地运行并理解它们是如何工作的 . 但在我们的实现中完全放入它似乎不起作用 .

用celery.py

from celery import Celery

def create_celery(app=None):
    application = app or create_app
    celery = Celery(app.import_name,
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with application.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

并示例tasks.py

from app import create_celery

@celery.task(name="tasks.add")
def add(a,b):
    return a+ b

因此,当我通过visual studio代码运行这些代码时,我没有看到任何错误或警告 . 但是当我在另一个命令提示符下运行它来启动worker时:

celery -A app.tasks.add worker -l info -P eventlet

它来自tracker.resources.locate导入定位ModuleNotFoundError:没有名为'tracker'的模块

它永远不会被执行 . 我还没有尝试在资源上调用任务,但我还是不能理解如何做到这一点 .

如果有人可以用这个来启发我,我将不胜感激 .

1 回答

  • 0

    我已经设法解决了这个问题,似乎问题是在绝对的道路上 .

    所以做了一些相关的改变,如:

    try:
       from tracker.resources.locate import Locate
    except ImportError:
       from resources.locate import Locate
    

    与其他进口声明一样 .

    现在我的下一个问题是,如何调用任务让我们说在GET方法中定位路径 .

    class Locate(Resource):
    
        def get(self, ud):
    

    如果有任何想法,请告诉我 .

相关问题