首页 文章

airflow 1.10.0 branchpythonoperator运行失败:Celery命令失败

提问于
浏览
0

我将airflow dag示例 example_branch_dop_operator_v3 代码复制到我自己的dag test1_v2,我可以成功运行example_branch_dop_operator_v3,但运行test1_v2失败了 . dag test1_v2代码(AIRFLOW_HOME / dags / test1.py):

import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': True,
}

dag = DAG(dag_id='test1_v2'
          schedule_interval='*/1 * * * *', default_args=args)


def should_run(ds, **kwargs):

    print('------------- exec dttm = {} and minute = {}'.
          format(kwargs['execution_date'], kwargs['execution_date'].minute))
    if kwargs['execution_date'].minute % 2 == 0:
        return "oper_1"
    else:
        return "oper_2"


cond = BranchPythonOperator(
    task_id='condition',
    provide_context=True,
    python_callable=should_run,
    dag=dag)

oper_1 = DummyOperator(
    task_id='oper_1',
    dag=dag)
oper_1.set_upstream(cond)

oper_2 = DummyOperator(
    task_id='oper_2',
    dag=dag)
oper_2.set_upstream(cond)

命令 airflow run test1_v2 condition "2018-09-01 00:00:00" ,有工作日志:

[2018-10-11 21:20:29,991] 信息 - 在主机CenT上运行
[2018-10-11 21:23:10,879] INFO - setting.configure_orm():使用池设置 . pool_size = 5,pool_recycle = 1800
[2018-10-11 21:23:11,343] { init .py:51}信息 - 使用执行程序CeleryExecutor
[2018-10-11 21:23:11,572] 信息 - 加载泡菜ID 26
Traceback(最近一次调用最后一次):
文件"/home/airflow/airflow/venv/bin/airflow",第32行,中
args.func(参数)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py",第74行,在包装器中
返回f(* args,** kwargs)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py",第480行,在运行中
DagPickle).filter(DagPickle.id == args.pickle).first()
首先是文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py",第2755行
ret = list(self [0:1])
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py",第2547行,在 getitem
返回列表(res)
实例中的文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",第90行
util.raise_from_cause(ERR)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py",第203行,在raise_from_cause中
reraise(type(exception),exception,tb = exc_tb,cause = cause)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py",第187行,重新加入
提高 Value
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",第75行,实例中
rows = [获取行中的proc(行)]
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",第75行,在
rows = [获取行中的proc(行)]
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",第452行,在_instance中
loaded_instance,populate_existing,populators)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",第513行,在_populate_full中
dict_ [key] = getter(row)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py",第1540行,正在处理中
返回负载(值)
在负载中文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py",第316行
返回加载(文件,忽略)
加载文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py",第304行
obj = pik.load()
在find_class中的文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py",第465行
返回StockUnpickler.find_class(self,module,name)
ImportError:没有名为'unusual_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1'的模块
[2018-10-11 21:23:11,823:ERROR / ForkPoolWorker-6] execute_command遇到CalledProcessError
Traceback(最近一次调用最后一次):
在execute_command中的文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py",第60行
close_fds = True,env = env)
文件"/data/python35/lib/python3.5/subprocess.py",第271行,在check_call中
引发CalledProcessError(retcode,cmd)
subprocess.CalledProcessError:命令'airflow run test1_v1 condition 2018-09-01T10:00:00+08:00 --pickle 26 --local'返回非零退出状态1
[2018-10-11 21:23:11,895:ERROR / ForkPoolWorker-6]无
[2018-10-11 21:23:12,103:ERROR / ForkPoolWorker-6]任务airflow.executors.celery_executor.execute_command [efb4ef09-bdf8-4123-85c8-4dc73dc19d74]引发意外:AirflowException('Celery command failed',)
Traceback(最近一次调用最后一次):
在execute_command中的文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py",第60行
close_fds = True,env = env)
在check_call中的文件"/data/python35/lib/python3.5/subprocess.py",第271行
引发CalledProcessError(retcode,cmd)
subprocess.CalledProcessError:命令'airflow run test1_v1 condition 2018-09-01T10:00:00+08:00 --pickle 26 --local'返回非零退出状态1

在处理上述异常期间,发生了另一个异常:

Traceback(最近一次调用最后一次):
在trace_task中的文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py",第375行
R = retval = fun(* args,** kwargs)
文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py",第632行,在 protected_call
return self.run(* args,** kwargs)
在execute_command中的文件"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py",第65行
引发AirflowException('Celery command failed')
airflow.exceptions.AirflowException:Celery命令失败

为什么dag test2_v1会失败?谢谢 .

1 回答

  • 0

    当我用 python_callable=range 替换 python_callable=should_run 时,运行这个dag successfuly,所以我猜原因是气流找不到should_run,因为它显示在日志中 ImportError: No module named 'unusual_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1'

    解决方案是:

    • 如果你使用命令你应该使用 airflow backfill test1_v2 -s 20180901 -e 20180902 -x documentation

    • 在气流调度程序触发的情况下没有这样的问题

相关问题