首页 文章

Flask Celery Python导入

提问于
浏览
0

我在Flask应用程序中集成Celery时遇到问题 . 这是回购https://github.com/theobouwman/community-python .

我通过运行 app.py 启动我的应用程序,导入我的应用程序(添加了蓝图和配置)和Celery .

/tasks/add.py 我有一个示例任务,我再次为 @celery.task 装饰器导入 Celery 对象 .

直到这一点,一切正常 . 我可以运行我的Flask应用程序并运行Celery工作程序 .

但是,当我尝试从蓝图中的控制器中触发任务时,就像这里https://github.com/theobouwman/community-python/blob/master/auth/controllers/RegistrationController.py#L38它说它无法导入它,这是一个逻辑反应 .

Traceback (most recent call last):
  File "app.py", line 2, in <module>
    from flask_app import app
  File "/development/projects/python/Community/flask_app.py", line 4, in <module>
    from auth.routes import auth
  File "/development/projects/python/Community/auth/routes.py", line 3, in <module>
    from .controllers import RegistrationController, AuthenticationController, LogoutController
  File "/development/projects/python/Community/auth/controllers/RegistrationController.py", line 10, in <module>
    from tasks.add import add
  File "/development/projects/python/Community/tasks/add.py", line 1, in <module>
    from app import celery
  File "/development/projects/python/Community/app.py", line 2, in <module>
    from flask_app import app
ImportError: cannot import name 'app'

我不知道如何修复这个导入周期,这就是这个问题的原因 . 我用Google搜索了3个小时但找不到解决方案 . 我希望有人能帮助我 .

空气中是否有Flask Slack或Gitter?

提前致谢 .

1 回答

  • 2

    RegistrationController.py 中的导入更改为本地导入以解决循环导入:

    from ..blueprint import auth
        from models import User
        from flask import redirect, url_for, request, render_template, flash
        import bcrypt
        from ..forms.register import SimpleRegistrationForm
        """
        Error in python3.6 app.py
        Says cyclus import error
        """
        # Comment out the line below
        # from tasks.add import add
    
    
        @auth.route('/register', methods=['GET', 'POST'])
        def register():
            form = SimpleRegistrationForm(request.form)
            if request.method == 'POST' and form.validate():
                fname = request.form['fname']
                sname = request.form['sname']
                email = request.form['email']
                password = request.form['password']
                hashed = bcrypt.hashpw(password.encode('utf-8 '), bcrypt.gensalt())
    
                user = User.select().where(User.email == email)
                if user.exists():
                    flash('Er bestaat al een account met dit email adres')
                    return redirect(url_for('auth.register'))
    
                user = User(fname=fname, sname=sname, email=email, password=hashed)
                user.save()
    
                flash('Uw account is aangemaakt. Kijk in uw mailbox voor de activatie link')
                return redirect(url_for('auth.register'))
            return render_template('pages/register.html', form=form)
    
    
        @auth.route('/register/test')
        def register_test():
            # local import avoids the cycle
            from tasks.add import add
            add.delay()
            # hashed = bcrypt.hashpw('test'.encode('utf-8 '), bcrypt.gensalt())
            # user = User(
            #     fname='Theo',
            #     sname='Bouwman',
            #     email='theobouwman98@gmail.com',
            #     password=hashed
            # )
            # user.save()
        return redirect(url_for('auth.login'))
    

相关问题