首页 文章

芹菜 Worker 只工作一次

提问于
浏览
0

完整步骤:

  • 开始django

  • 开始一个芹菜 Worker

python manage.py celery worker --app = celery_worker:app -Ofair -n W1

  • 上传一个url列表文件,循环url列表发送每个url到一个任务 fetch_article

  • Worker 工作

  • 上传另一个url列表文件

  • Worker 没有行动

views.py:

@csrf_exempt
def upload(request):

    job_name = request.POST.get('job_name')
    if not job_name:
        return JsonResponse(JsonStatus.Error)

    if len(request.FILES) == 1:
        yq_data = request.FILES.values()[0]
    else:
        return JsonResponse(JsonStatus.Error)

    job = Job.objects.create(name=job_name)

    reader = csv.reader(yq_data, delimiter=',')

    task_count = 0

    next(reader)
    for row in reader:
        url = row[0].strip()
        fetch_article.delay(job.id, url)
        # fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
        task_count += 1


    # print 'qn%s' % job.queue_name
    # rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
    # print rp

    job.task_count = task_count
    job.save()

    return JsonResponse(JsonStatus.OK, msg=task_count)

tasks.py

@shared_task()
def fetch_article(job_id, url):

    logger.info(u'fetch_article:%s' % url)

    Processer = get_processor_cls(url)

    a = Article(job_id=job_id, url=url)
    try:
        ap = Processer(url)
        title, text = ap.process()
        a.title = title
        a.content = text

    except Exception as e:
        a.status = 2
        a.error = e
        logger.error(u'fetch_article:%s error:%s' % (url, e))

    a.save()

1 回答

  • 0

    好的,我发现了问题 .

    因为我在设置中设置了 CELERY_ALWAYS_EAGER = True . 任务在django主进程中运行,所以 Worker 没有动作

    来自doc:

    CELERY_ALWAYS_EAGER如果为True,则将通过阻塞在本地执行所有任务,直到任务返回为止 . apply_async()和Task.delay()将返回一个EagerResult实例,该实例模拟AsyncResult的API和行为,但结果已经过评估 . 也就是说,任务将在本地执行,而不是发送到队列 .

    对于工作人员第一次工作,我仍然感到困惑,可能是在以前的工作队列中有一些网址 .

相关问题