我正在尝试制作一个调用Celery链的测试Flask应用程序,从链任务中获取任务ID,并检查任务的状态 . 从我启动Celery工作器的命令提示符,我可以看到任务成功,但是当我检查任务状态时,它总是“PENDING”,我无法得到结果 .
我正在使用Windows 10和Celery 3.1.25,因为Windows在4.0之后停止了支持
芹菜 Worker 命令:
python -m celery worker -A app.celeryApp --loglevel=info --pool=eventlet
芹菜 Worker 日志:
-------------- celery@xxx v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.15063-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: app:0x23dd01de3c8
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. testModule.tasks.add
[2018-04-25 15:56:36,510: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-04-25 15:56:36,527: INFO/MainProcess] mingle: searching for neighbors
[2018-04-25 15:56:37,548: INFO/MainProcess] mingle: all alone
[2018-04-25 15:56:37,568: INFO/MainProcess] pidbox: Connected to amqp://guest:**@127.0.0.1:5672//.
[2018-04-25 15:56:37,570: WARNING/MainProcess] celery@xxx ready.
[2018-04-25 15:56:49,590: INFO/MainProcess] Received task: testModule.tasks.add[2a6dbfca-a53b-4beb-8386-14ae06c2b076]
[2018-04-25 15:56:54,643: INFO/MainProcess] Task testModule.tasks.add[2a6dbfca-a53b-4beb-8386-14ae06c2b076] succeeded in 5.062999999965541s: 14
[2018-04-25 15:56:54,644: INFO/MainProcess] Received task: testModule.tasks.add[8d97e7d1-f06a-4e23-be35-f02c16865c8b]
[2018-04-25 15:56:59,659: INFO/MainProcess] Task testModule.tasks.add[8d97e7d1-f06a-4e23-be35-f02c16865c8b] succeeded in 5.01500000001397s: 19
[2018-04-25 15:56:59,660: INFO/MainProcess] Received task: testModule.tasks.add[fd112bed-c565-4e82-8806-890da0b952f0]
[2018-04-25 15:57:04,659: INFO/MainProcess] Task testModule.tasks.add[fd112bed-c565-4e82-8806-890da0b952f0] succeeded in 5.0s: 22
应用结构:
|-- app
|-- __init__.py
|-- testModule
|-- __init__.py
|-- tasks.py
|-- celerytest.py
应用程序/ __ init__.py
from flask import Flask
from celery import Celery
app = Flask(__name__)
BROKER_URL="amqp://"
RESULT_BACKEND="rpc://"
TASK_TRACK_STARTED=True
INCLUDE=['testModule.tasks']
def make_celery(app):
celery = Celery(app.name, BROKER_URL=BROKER_URL)
celery.conf.update(
CELERY_RESULT_BACKEND = RESULT_BACKEND,
CELERY_TRACK_STARTED = TASK_TRACK_STARTED,
CELERY_INCLUDE= INCLUDE
)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celeryApp = make_celery(app)
testModule / tasks.py
from app import celeryApp
from time import sleep
@celeryApp.task(bind=True)
def add(self, x, y):
self.update_state(state='PROGRESS')
sleep(5)
return x + y
celerytest.py
from flask import request
from celery import chain
from app import app
import testModule.tasks
def getChainIds(node):
# for a chain like chain(task1, task2, ...), this returns an array of task ids like [task1_id, task2_id]
id_chain = []
while node.parent:
id_chain.append(node.id)
node = node.parent
id_chain.append(node.id)
id_chain.reverse()
return id_chain
@app.route('/chainAdd', methods=['GET'])
def chainAdd():
addChain = chain(
testModule.tasks.add.s(10, 4),
testModule.tasks.add.s(5),
testModule.tasks.add.s(3)
)
addChainResult = addChain.apply_async()
chainIds = getChainIds(addChainResult)
return str(chainIds)
@app.route('/checkChainStatus', methods=['GET'])
def checkChainStatus():
chainIds = ['2a6dbfca-a53b-4beb-8386-14ae06c2b076', '8d97e7d1-f06a-4e23-be35-f02c16865c8b', 'fd112bed-c565-4e82-8806-890da0b952f0']
results = []
for id in chainIds:
results.append(testModule.tasks.add.AsyncResult(id).state)
return str(results)
在进入/ chainAdd并获取ID列表后,我一直在checkChainStatus()中手动填写chainIds列表进行测试 . checkChainStatus()只返回['PENDING','PENDING','PENDING'] . 有任何想法吗?
Update :如果我将结果后端更改为amqp,我可以获得正确的任务状态和结果
RESULT_BACKEND='amqp'
但是,为什么这不适用于rpc?我认为rpc是优越的后端,所以我想使用它