首页 文章

初始化不同 Value 的芹菜 Worker

提问于
浏览
7

我正在使用芹菜在Hadoop上运行长时间运行的任务 . 每个任务在Hadoop上执行一个Pig脚本,运行大约30分钟 - 2小时 .

我当前的Hadoop设置有4个队列a,b,c和默认值 . 所有任务当前都由单个工作程序执行,该工作程序将作业提交到单个队列 .

我想再添加3个将作业提交到其他队列的工作者,每个队列一个工作者 .

问题是队列目前是硬编码的,我希望每个 Worker 都有这个变量 .

我搜索了很多,但我无法找到一种方法来传递每个芹菜工作者不同的队列值并在我的任务中访问它 .

我这样开始我的芹菜 Worker .

celery -A app.celery worker

我希望在命令行本身传递一些额外的参数并在我的任务中访问它,但芹菜抱怨它不理解我的自定义参数 .

我计划通过设置 --concurrency=3 参数来运行同一主机上的所有worker . 有没有解决这个问题的方法?

谢谢!

EDIT

目前的情况是这样的 . 我每次尝试通过说 tasks.print_something.delay() 执行任务print_something它只打印队列C.

@celery.task()
def print_something():
    print "C"

我需要让工作人员根据我在启动时传递给他们的值来打印一个可变字母 .

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"

2 回答

  • 3

    我通常做的是,在另一个脚本(比如manage.py)中启动worker(任务未执行)后,我添加带参数的命令来启动具有不同参数的特定任务或任务 .

    在manager.py中:

    from tasks import some_task
    
    @click.command
    def run_task(params):
        some_task.apply_async(params)
    

    这将根据需要启动任务 .

  • 3

    希望这有助于某人 .

    解决这个问题需要多个问题 .

    第一步涉及在celery中为自定义参数添加支持 . 如果不这样做,芹菜会抱怨它不理解参数 .

    因为我用Flask运行芹菜,所以我就像这样初始化芹菜 .

    def configure_celery():
        app.config.update(
            CELERY_BROKER_URL='amqp://:@localhost:5672',
            RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'            
        )
        celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                        broker=app.config['CELERY_BROKER_URL'])
        celery.conf.update(app.config)
        TaskBase = celery.Task
    
        class ContextTask(TaskBase):
            abstract = True
    
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
    
        celery.Task = ContextTask
        return celery
    

    我将此函数称为初始化芹菜,并将其存储在名为celery的变量中 .

    celery = configure_celery()
    

    要添加自定义参数,您需要执行以下操作 .

    def add_hadoop_queue_argument_to_worker(parser):
        parser.add_argument(
            '--hadoop-queue', help='Hadoop queue to be used by the worker'
        )
    

    下面使用的芹菜是我们从上述步骤中获得的芹菜 .

    celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)
    

    下一步是在工作者中访问此参数 . 为此,请按照以下步骤操作 .

    class HadoopCustomWorkerStep(bootsteps.StartStopStep):
    
        def __init__(self, worker, **kwargs):
            worker.app.hadoop_queue = kwargs['hadoop_queue']
    

    告知芹菜使用此类来创建工作者 .

    celery.steps['worker'].add(HadoopCustomWorkerStep)
    

    现在,任务应该能够访问变量 .

    @app.task(bind=True)
    def print_hadoop_queue_from_config(self):
        print self.app.hadoop_queue
    

    通过在命令行上运行worker来验证它 .

    celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
    celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
    celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
    celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
    

相关问题