首页 文章

如何将周期性任务发送到Celery中的特定队列

提问于
浏览
30

默认情况下,Celery将所有任务发送到“celery”队列,但您可以通过添加额外参数来更改此行为:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

调度程序设置:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

跑步 Worker :

python manage.py celery worker -c 1 -Q celery_periodic -B -E

此方案无法按预期工作:此工作人员将定期任务发送到“芹菜”队列,而不是“celery_periodic” . 我该如何解决这个问题?

附:芹菜== 3.0.16

3 回答

  • 22

    定期被celerybeat发送到队列 . 你可以做我们用Celery api做的每件事 . 以下是celerybeat附带的配置列表 .

    http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

    在你的情况下

    CELERYBEAT_SCHEDULE = {
       'installer_recalc_hour': {
            'task': 'stats.installer.tasks.recalc_last_hour',
            'schedule': 15  # every 15 sec for test,
            'options': {'queue' : 'celery_periodic'} ##options are mapped to apply_async options
        },
    }
    
  • 36

    我找到了解决这个问题的方法:

    1)首先,我改变了配置周期性任务的方法 . 我像这样使用 @periodic_task 装饰器:

    @periodic_task(run_every=crontab(minute='5'),
                   queue='celery_periodic',
                   options={'queue': 'celery_periodic'})
    def recalc_last_hour():
        dt = datetime.utcnow()
        prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
                    - timedelta(hours=1)
        log.debug('Generating task for hour %s', str(prev_hour))
        recalc_hour.delay(prev_hour)
    

    2)我在 @periodic_task 中用params写了两次 celery_periodic

    从代码调用任务时使用

    • queue='celery_periodic' 选项(.delay或.apply_async)

    当芹菜殴打它时,使用

    • options={'queue': 'celery_periodic'} 选项 .

    我敢肯定,如果你用CELERYBEAT_SCHEDULE变量配置周期性任务,同样的事情是可能的 .

    UPD . 此解决方案适用于 CELERYBEAT_SCHEDULER 的基于数据库和基于文件的存储 .

  • 2

    如果您使用的是djcelery数据库调度程序,则可以在执行选项 - >队列字段中指定队列

相关问题