我使用的是Python 2.7(sigh),celery == 3.1.19,librabbitmq == 1.6.1,rabbitmq-server-3.5.6-1.noarch和redis 2.8.24(来自redis-cli info) .
我正试图从芹菜 生产环境 者那里向芹菜消费者发送消息,然后在 生产环境 者处获得结果 . 有1个 生产环境 者和1个消费者,但其间有2个rabbitmq(作为经纪人)和1个redis(结果) .
我面临的问题是:
-
在消费者中,我通过async_result = ZipUp.delay(unique_directory)得到一个AsyncResult,但是async_result.ready()永远不会返回True(至少9秒它没有) - 即使对于基本上做的消费者任务也是如此只返回一个字符串 .
-
我可以看到,在rabbitmq管理网页界面中,我的消息是由rabbitmq交换机收到的,但它似乎没有被记录 .
如果我不尝试从AsyncResult返回结果,事情就会奏效!但我有点希望得到调用的结果 - 这很有用:) .
以下是配置细节 .
我们按照以下方式设置Celery以获得退货:
CELERY_RESULT_BACKEND = 'redis://%s' % _SHARED_WRITE_CACHE_HOST_INTERNAL
CELERY_RESULT = Celery('TEST', broker=CELERY_BROKER)
CELERY_RESULT.conf.update(
BROKER_HEARTBEAT=60,
CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,
CELERY_TASK_RESULT_EXPIRES=100,
CELERY_IGNORE_RESULT=False,
CELERY_RESULT_PERSISTENT=False,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
我们有另一个不期望返回值的Celery配置,并且可以在同一个程序中运行 . 看起来像:
CELERY = Celery('TEST', broker=CELERY_BROKER)
CELERY.conf.update(
BROKER_HEARTBEAT=60,
CELERY_RESULT_BACKEND=CELERY_BROKER,
CELERY_TASK_RESULT_EXPIRES=100,
CELERY_STORE_ERRORS_EVEN_IF_IGNORED=False,
CELERY_IGNORE_RESULT=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
芹菜 生产环境 者的存根看起来像:
@CELERY_RESULT.task(name='ZipUp', exchange='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION)
def ZipUp(directory): # pylint: disable=invalid-name
""" Task stub """
_unused_directory = directory
raise NotImplementedError
有人提到在这个存根中使用queue =而不是exchange =会更简单 . 任何人都可以确认(我用Google搜索,但在主题上找不到任何内容)?显然你可以使用queue =除非你想使用扇出或类似的东西,因为并非所有的芹菜后端都有交换的概念 .
无论如何,芹菜消费者开始时:
@task(queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION, name='ZipUp')
@StatsInstrument('workflow.ZipUp')
def ZipUp(directory): # pylint: disable=invalid-name
'''
Zip all files in directory, password protected, and return the pathname of the new zip archive.
:param directory Directory to zip
'''
try:
LOGGER.info('zipping up {}'.format(directory))
但是“拉链”并没有记录在任何地方 . 我在芹菜服务器上搜索了该字符串的每个(磁盘支持的)文件,并获得了两个命中:/ usr / bin / zip和我的celery任务的代码 - 并且没有日志消息 .
有什么建议?
谢谢阅读!
1 回答
似乎在 生产环境 者中使用以下任务存根解决了问题:
简而言之,它使用queue =而不是exchange = .