首页 文章

Flask Celery Redis:消费者:无法连接到amqp:// guest:** @ 127.0.0.1:5672 //:超时

提问于
浏览
0

我有一个简单的芹菜任务设置 . 为了运行它,我首先关闭redis-server,然后激活虚拟环境并进入“celery beat”,打开一个新的终端窗口进入虚拟环境并进入“芹菜 Worker ”

Flask==1.0.2
celery==4.2.1
requests==2.19

这是之后的错误消息:

consumer:无法连接到amqp:// guest:** @ 127.0.0.1:5672 //:超时 .

这是执行'celery beat'后显示的配置详细信息:

配置 - > . 经纪人 - > amqp://客人:** @ localhost:5672 // . loader - > celery.loaders.default.Loader . scheduler - > celery.beat.PersistentScheduler . db - > celerybeat-schedule . logfile - > [stderr] @%警告 . maxinterval - > 5.00分钟(300s)

flask-proj/app/init.py

from flask import Flask, request, jsonify
from celery import Celery
import celeryconfig

app = Flask(__name__)
app.config.from_object('config')

def make_celery(app):
    # create context tasks in celery
    celery = Celery(
        app.import_name,
        broker=app.config['BROKER_URL']
    )
    celery.conf.update(app.config)
    celery.config_from_object(celeryconfig)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery

celery = make_celery(app)

@app.route("/")
def hello():
    return "Hello World!"

flask-proj/tasks/test.py

import celery

@celery.task()
def print_hello():
    logger = print_hello.get_logger()
    logger.info("Hello")

flask-proj/config.py

import os

REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 BROKER_URL = environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
    host=REDIS_HOST, port=str(REDIS_PORT))) CELERY_RESULT_BACKEND = BROKER_URL

flask-proj/celeryconfig.py

from celery.schedules import crontab

CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'

CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERYBEAT_SCHEDULE = {
    'test-celery': {
        'task': 'app.tasks.test.print_hello',
        # Every minute
        'schedule': crontab(minute="*"),
    }
}

如果我需要提供其他详细信息,请告诉我 .

3 回答

  • 1

    make_celery() 函数中删除 celery.conf.update(app.config) 行,因此它会像,

    def make_celery(app):
        # create context tasks in celery
        celery = Celery(
            app.import_name,
            broker=app.config['BROKER_URL']
        )
        celery.conf.update(app.config) # remove this line.
        celery.config_from_object(celeryconfig)
        TaskBase = celery.Task
    

    并将 flask-proj/config.py 的粘贴内容复制到 flask-proj/celeryconfig.py .

    因此他会像,

    from celery.schedules import crontab
    
    import os
    
    REDIS_HOST = "127.0.0.1"
    REDIS_PORT = 6379
    BROKER_URL = os.environ.get(
        'REDIS_URL', "redis://{host}:{port}/0".format(
            host=REDIS_HOST, port=str(REDIS_PORT)))
    CELERY_RESULT_BACKEND = BROKER_URL
    
    CELERY_IMPORTS = ('app.tasks.test')
    CELERY_TASK_RESULT_EXPIRES = 30
    CELERY_TIMEZONE = 'UTC'
    
    CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    
    CELERYBEAT_SCHEDULE = {
        'test-celery': {
            'task': 'app.tasks.test.print_hello',
            # Every minute
            'schedule': crontab(minute="*"),
        }
    }
    
  • 0

    amqp是rabbitmq而不是redis .

    Redis通常是

    redis://:password@hostname:port/db_number

    我会手动配置以查看它是否有效 .

    flask_app.config.update(
        CELERY_BROKER_URL='redis://localhost:6379',
        CELERY_RESULT_BACKEND='redis://localhost:6379'
    )
    
  • 0

    在Django中遇到同样的问题,但我的问题是在settings.py中使用“BROKER_URL”而不是“CELERY_BROKER_URL” . Celery没有找到URL并且默认为rabbitmq端口而不是redis端口 .

相关问题