首页 文章

如何在气流中设置多个Dag目录

提问于
浏览
1

我为不同的python项目设置了不同的气流dag,即一个父dags文件夹 /vol/dags ,基于不同的python项目的DAG子文件夹: /vol/dags/project1/project1.py, /vol/dags/project2/project2.py where DAGS_FOLDER = /vol/dags .

project1.py 例如从同一目录中的另一个python文件导入一个函数,即 /vol/dags/project1/mycalculator.py . 但是当我启动airflow webserver时,我得到一个 ImportError

/vol/dags/project1/$ airflow webserver -p 8080

INFO - Filling up the DagBag from /vol/dags/
ERROR - Failed to import: /vol/dags/project1/project1.py
Traceback (most recent call last):
  File "/Users/xxx/anaconda/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/vol/dags/project1/project1.py", line 10, in <module>
    from mycalculator import *
ImportError: No module named mycalculator

我尝试将 mycalculator.py 导入 project1.py ,如下所示:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from mycalculator import *

dag = DAG(
    dag_id='project1', default_args=args,
    schedule_interval="@once")

2 回答

  • 0

    文件夹 /vol/dags/project1/ 缺少 __init__.py 文件 .

    此文件可以为空 .

    添加此文件然后在project2.py中您应该能够:

    import project1.mycalculator.*
    

    有关包装的更多信息,请参见此处:https://docs.python.org/2/tutorial/modules.html#packages

  • 1

    您可以使用packaged dag概念为不同的项目设置不同的dag文件夹 . 您只需要在父dag文件夹中放置每个项目的zip .

    通过这种方式,您可以轻松地将dags与其依赖项相结合,并且您的dag文件夹将整洁干净,因为它只包含每个项目的zip .

    您可以创建一个如下所示的zip:

    my_dag1.py
    my_dag2.py
    package1/__init__.py
    package1/functions.py
    

    你的父dag文件夹看起来像这样:

    project1.zip
    project2.zip
    my_dag3.py
    

相关问题