首页 文章

Celery / RabbitMQ / Django没有运行任务

提问于
浏览
2

我希望有人可以帮助我,因为我查看了Stack Overflow并找不到我的问题的解决方案 . 我正在运行一个Django项目,并安装了Supervisor,RabbitMQ和Celery . RabbitMQ启动并运行,Supervisor确保我的celerybeat正在运行,但是,当它记录节拍已开始并每5分钟发送一次任务(见下文)时,任务实际上从未执行:

我的主管计划conf:

[program:nrv_twitter]
; Set full path to celery program if using virtualenv
command=/Users/tsantor/.virtualenvs/nrv_env/bin/celery beat -A app --loglevel=INFO --pidfile=/tmp/nrv-celerybeat.pid --schedule=/tmp/nrv-celerybeat-schedule

; Project dir
directory=/Users/tsantor/Projects/NRV/nrv

; Logs
stdout_logfile=/Users/tsantor/Projects/NRV/nrv/logs/celerybeat_twitter.log
redirect_stderr=true

autorestart=true
autostart=true
startsecs=10
user=tsantor

; if rabbitmq is supervised, set its priority higher so it starts first
priority=999

以下是上述程序的日志输出:

[2014-12-16 20:29:42,293: INFO/MainProcess] beat: Starting...
[2014-12-16 20:34:08,161: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:39:08,186: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:44:08,204: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:49:08,205: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:54:08,223: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)

这是我的 celery.py 设置文件:

from datetime import timedelta

BROKER_URL = 'amqp://guest:guest@localhost//'

CELERY_DISABLE_RATE_LIMITS = True

CELERYBEAT_SCHEDULE = {
    'gettweets-every-5-mins': {
        'task': 'twitter.tasks.get_tweets',
        'schedule': timedelta(seconds=300) # 300 = every 5 minutes
    },
}

这是我的 celeryapp.py

from __future__ import absolute_import
import os
from django.conf import settings
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

app = Celery('app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

这是我的 twitter/tasks.py

from __future__ import absolute_import
import logging
from celery import shared_task
from twitter.views import IngestTweets

log = logging.getLogger('custom.log')

@shared_task
def get_tweets():
    """
    Get tweets and save them to the DB
    """
    instance = IngestTweets()
    IngestTweets.get_new_tweets(instance)

    log.info('Successfully ingested tweets via celery task')
    return True

get_tweets 方法永远不会被执行,但是我知道它可以正常工作,因为我可以手动执行 get_tweets 并且它工作正常 .

我花了两天时间试图弄清楚为什么要发送应有的任务,但不执行它们?任何帮助是极大的赞赏 . 提前致谢 .

2 回答

  • 0

    user2097159感谢你指出我正确的方向,我不知道我还必须使用主管来管理一名 Worker . 我认为这不过是一个 Worker 或一个节拍,但现在我明白我必须有一个 Worker 来处理任务和一个节拍定期开除任务 .

    以下是主管缺少的工作人员配置:

    [program:nrv_celery_worker]
    ; Worker
    command=/Users/tsantor/.virtualenvs/nrv_env/bin/celery worker -A app --loglevel=INFO
    
    ; Project dir
    directory=/Users/tsantor/Projects/NRV/nrv
    
    ; Logs
    stdout_logfile=/Users/tsantor/Projects/NRV/nrv/logs/celery_worker.log
    redirect_stderr=true
    
    autostart=true
    autorestart=true
    startsecs=10
    user=tsantor
    numprocs=1
    
    ; Need to wait for currently executing tasks to finish at shutdown.
    ; Increase this if you have very long running tasks.
    stopwaitsecs = 600
    
    ; When resorting to send SIGKILL to the program to terminate it
    ; send SIGKILL to its whole process group instead,
    ; taking care of its children as well.
    killasgroup=true
    
    ; if rabbitmq is supervised, set its priority higher
    ; so it starts first
    priority=998
    

    然后我重置了RabbitMQ队列 . 既然我有通过主管管理的节拍和工作程序,所有都按预期工作 . 希望这有助于其他人 .

  • 2

    您需要启动工作进程和节拍进程 . 您可以创建单独的进程as described in tsantor's answer,也可以使用worker和beat创建单个进程 . 这在开发过程中可能更方便(但不建议用于 生产环境 ) .

    来自"Starting the scheduler" in the Celery documentation

    你也可以通过启用workers -B选项在工作者中嵌入节拍,如果你永远不会运行多个工作节点,这很方便,但它并不常用,因此不推荐用于 生产环境 用途:$芹菜 - 项目 Worker -B

    要在Supervisor配置文件中表达,请参阅https://github.com/celery/celery/tree/master/extra/supervisord/(从"Daemonization"链接)

相关问题