我正在尝试制作一个调用Celery链的测试Flask应用程序,从链任务中获取任务ID,并检查任务的状态 . 从我启动Celery工作器的命令提示符,我可以看到任务成功,但是当我检查任务状态时,它总是“PENDING”,我无法得到结果 .

我正在使用Windows 10和Celery 3.1.25,因为Windows在4.0之后停止了支持

芹菜 Worker 命令:

python -m celery worker -A app.celeryApp --loglevel=info --pool=eventlet

芹菜 Worker 日志:

-------------- celery@xxx v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.15063-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         app:0x23dd01de3c8
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery           exchange=celery(direct) key=celery


[tasks]
. testModule.tasks.add

[2018-04-25 15:56:36,510: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-04-25 15:56:36,527: INFO/MainProcess] mingle: searching for neighbors
[2018-04-25 15:56:37,548: INFO/MainProcess] mingle: all alone
[2018-04-25 15:56:37,568: INFO/MainProcess] pidbox: Connected to amqp://guest:**@127.0.0.1:5672//.
[2018-04-25 15:56:37,570: WARNING/MainProcess] celery@xxx ready.
[2018-04-25 15:56:49,590: INFO/MainProcess] Received task: testModule.tasks.add[2a6dbfca-a53b-4beb-8386-14ae06c2b076]
[2018-04-25 15:56:54,643: INFO/MainProcess] Task testModule.tasks.add[2a6dbfca-a53b-4beb-8386-14ae06c2b076] succeeded in 5.062999999965541s: 14
[2018-04-25 15:56:54,644: INFO/MainProcess] Received task: testModule.tasks.add[8d97e7d1-f06a-4e23-be35-f02c16865c8b]
[2018-04-25 15:56:59,659: INFO/MainProcess] Task testModule.tasks.add[8d97e7d1-f06a-4e23-be35-f02c16865c8b] succeeded in 5.01500000001397s: 19
[2018-04-25 15:56:59,660: INFO/MainProcess] Received task: testModule.tasks.add[fd112bed-c565-4e82-8806-890da0b952f0]
[2018-04-25 15:57:04,659: INFO/MainProcess] Task testModule.tasks.add[fd112bed-c565-4e82-8806-890da0b952f0] succeeded in 5.0s: 22

应用结构:

|-- app
   |-- __init__.py
|-- testModule
   |-- __init__.py
   |-- tasks.py
|-- celerytest.py

应用程序/ __ init__.py

from flask import Flask
from celery import Celery

app = Flask(__name__)

BROKER_URL="amqp://"
RESULT_BACKEND="rpc://"
TASK_TRACK_STARTED=True
INCLUDE=['testModule.tasks']

def make_celery(app):
    celery = Celery(app.name, BROKER_URL=BROKER_URL)
    celery.conf.update(
        CELERY_RESULT_BACKEND = RESULT_BACKEND,
        CELERY_TRACK_STARTED = TASK_TRACK_STARTED,
        CELERY_INCLUDE= INCLUDE
    )
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celeryApp = make_celery(app)

testModule / tasks.py

from app import celeryApp
from time import sleep

@celeryApp.task(bind=True)
def add(self, x, y):
    self.update_state(state='PROGRESS')
    sleep(5)
    return x + y

celerytest.py

from flask import request
from celery import chain
from app import app
import testModule.tasks

def getChainIds(node):
    # for a chain like chain(task1, task2, ...), this returns an array of task ids like [task1_id, task2_id]
    id_chain = []
    while node.parent:
        id_chain.append(node.id)
        node = node.parent
    id_chain.append(node.id)
    id_chain.reverse()
    return id_chain

@app.route('/chainAdd', methods=['GET'])
def chainAdd():
    addChain = chain(
        testModule.tasks.add.s(10, 4),
        testModule.tasks.add.s(5),
        testModule.tasks.add.s(3)
    )
    addChainResult = addChain.apply_async()
    chainIds = getChainIds(addChainResult)
    return str(chainIds)

@app.route('/checkChainStatus', methods=['GET'])
def checkChainStatus():
    chainIds = ['2a6dbfca-a53b-4beb-8386-14ae06c2b076', '8d97e7d1-f06a-4e23-be35-f02c16865c8b', 'fd112bed-c565-4e82-8806-890da0b952f0']
    results = []
    for id in chainIds:
        results.append(testModule.tasks.add.AsyncResult(id).state)
    return str(results)

在进入/ chainAdd并获取ID列表后,我一直在checkChainStatus()中手动填写chainIds列表进行测试 . checkChainStatus()只返回['PENDING','PENDING','PENDING'] . 有任何想法吗?

Update :如果我将结果后端更改为amqp,我可以获得正确的任务状态和结果

RESULT_BACKEND='amqp'

但是,为什么这不适用于rpc?我认为rpc是优越的后端,所以我想使用它