首页 文章

Celery,RabbitMQ,Redis:芹菜消息进入交换,但没有排队?

提问于
浏览
0

我使用的是Python 2.7(sigh),celery == 3.1.19,librabbitmq == 1.6.1,rabbitmq-server-3.5.6-1.noarch和redis 2.8.24(来自redis-cli info) .

我正试图从芹菜 生产环境 者那里向芹菜消费者发送消息,然后在 生产环境 者处获得结果 . 有1个 生产环境 者和1个消费者,但其间有2个rabbitmq(作为经纪人)和1个redis(结果) .

我面临的问题是:

  • 在消费者中,我通过async_result = ZipUp.delay(unique_directory)得到一个AsyncResult,但是async_result.ready()永远不会返回True(至少9秒它没有) - 即使对于基本上做的消费者任务也是如此只返回一个字符串 .

  • 我可以看到,在rabbitmq管理网页界面中,我的消息是由rabbitmq交换机收到的,但它似乎没有被记录 .

如果我不尝试从AsyncResult返回结果,事情就会奏效!但我有点希望得到调用的结果 - 这很有用:) .

以下是配置细节 .

我们按照以下方式设置Celery以获得退货:

CELERY_RESULT_BACKEND = 'redis://%s' % _SHARED_WRITE_CACHE_HOST_INTERNAL
CELERY_RESULT = Celery('TEST', broker=CELERY_BROKER)
CELERY_RESULT.conf.update(
    BROKER_HEARTBEAT=60,
    CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,
    CELERY_TASK_RESULT_EXPIRES=100,
    CELERY_IGNORE_RESULT=False,
    CELERY_RESULT_PERSISTENT=False,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    )

我们有另一个不期望返回值的Celery配置,并且可以在同一个程序中运行 . 看起来像:

CELERY = Celery('TEST', broker=CELERY_BROKER)
CELERY.conf.update(
   BROKER_HEARTBEAT=60,
   CELERY_RESULT_BACKEND=CELERY_BROKER,
   CELERY_TASK_RESULT_EXPIRES=100,
   CELERY_STORE_ERRORS_EVEN_IF_IGNORED=False,
   CELERY_IGNORE_RESULT=True,
   CELERY_ACCEPT_CONTENT=['json'],
   CELERY_TASK_SERIALIZER='json',
   CELERY_RESULT_SERIALIZER='json',
   )

芹菜 生产环境 者的存根看起来像:

@CELERY_RESULT.task(name='ZipUp', exchange='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION)
def ZipUp(directory): # pylint: disable=invalid-name
    """ Task stub """
    _unused_directory = directory
    raise NotImplementedError

有人提到在这个存根中使用queue =而不是exchange =会更简单 . 任何人都可以确认(我用Google搜索,但在主题上找不到任何内容)?显然你可以使用queue =除非你想使用扇出或类似的东西,因为并非所有的芹菜后端都有交换的概念 .

无论如何,芹菜消费者开始时:

@task(queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION, name='ZipUp')
@StatsInstrument('workflow.ZipUp')
def ZipUp(directory): # pylint: disable=invalid-name
    '''
    Zip all files in directory, password protected, and return the pathname of the new zip archive.
    :param directory Directory to zip
    '''
    try:
        LOGGER.info('zipping up {}'.format(directory))

但是“拉链”并没有记录在任何地方 . 我在芹菜服务器上搜索了该字符串的每个(磁盘支持的)文件,并获得了两个命中:/ usr / bin / zip和我的celery任务的代码 - 并且没有日志消息 .

有什么建议?

谢谢阅读!

1 回答

  • 0

    似乎在 生产环境 者中使用以下任务存根解决了问题:

    @CELERY_RESULT.task(name='ZipUp', queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION)
    def ZipUp(directory): # pylint: disable=invalid-name
        """ Task stub """
        _unused_directory = directory
        raise NotImplementedError
    

    简而言之,它使用queue =而不是exchange = .

相关问题