我为不同的python项目设置了不同的气流dag,即一个父dags文件夹 /vol/dags
,基于不同的python项目的DAG子文件夹: /vol/dags/project1/project1.py, /vol/dags/project2/project2.py
where DAGS_FOLDER = /vol/dags
.
project1.py
例如从同一目录中的另一个python文件导入一个函数,即 /vol/dags/project1/mycalculator.py
. 但是当我启动airflow webserver时,我得到一个 ImportError
:
/vol/dags/project1/$ airflow webserver -p 8080
INFO - Filling up the DagBag from /vol/dags/
ERROR - Failed to import: /vol/dags/project1/project1.py
Traceback (most recent call last):
File "/Users/xxx/anaconda/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
m = imp.load_source(mod_name, filepath)
File "/vol/dags/project1/project1.py", line 10, in <module>
from mycalculator import *
ImportError: No module named mycalculator
我尝试将 mycalculator.py
导入 project1.py
,如下所示:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from mycalculator import *
dag = DAG(
dag_id='project1', default_args=args,
schedule_interval="@once")
2 回答
文件夹
/vol/dags/project1/
缺少__init__.py
文件 .此文件可以为空 .
添加此文件然后在project2.py中您应该能够:
有关包装的更多信息,请参见此处:https://docs.python.org/2/tutorial/modules.html#packages
您可以使用packaged dag概念为不同的项目设置不同的dag文件夹 . 您只需要在父dag文件夹中放置每个项目的zip .
通过这种方式,您可以轻松地将dags与其依赖项相结合,并且您的dag文件夹将整洁干净,因为它只包含每个项目的zip .
您可以创建一个如下所示的zip:
你的父dag文件夹看起来像这样: