完整步骤:
-
开始django
-
开始一个芹菜 Worker
python manage.py celery worker --app = celery_worker:app -Ofair -n W1
-
上传一个url列表文件,循环url列表发送每个url到一个任务
fetch_article
-
Worker 工作
-
上传另一个url列表文件
-
Worker 没有行动
views.py:
@csrf_exempt
def upload(request):
job_name = request.POST.get('job_name')
if not job_name:
return JsonResponse(JsonStatus.Error)
if len(request.FILES) == 1:
yq_data = request.FILES.values()[0]
else:
return JsonResponse(JsonStatus.Error)
job = Job.objects.create(name=job_name)
reader = csv.reader(yq_data, delimiter=',')
task_count = 0
next(reader)
for row in reader:
url = row[0].strip()
fetch_article.delay(job.id, url)
# fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
task_count += 1
# print 'qn%s' % job.queue_name
# rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
# print rp
job.task_count = task_count
job.save()
return JsonResponse(JsonStatus.OK, msg=task_count)
tasks.py
@shared_task()
def fetch_article(job_id, url):
logger.info(u'fetch_article:%s' % url)
Processer = get_processor_cls(url)
a = Article(job_id=job_id, url=url)
try:
ap = Processer(url)
title, text = ap.process()
a.title = title
a.content = text
except Exception as e:
a.status = 2
a.error = e
logger.error(u'fetch_article:%s error:%s' % (url, e))
a.save()
1 回答
好的,我发现了问题 .
因为我在设置中设置了
CELERY_ALWAYS_EAGER = True
. 任务在django主进程中运行,所以 Worker 没有动作来自doc:
对于工作人员第一次工作,我仍然感到困惑,可能是在以前的工作队列中有一些网址 .