首页 文章

在Airflow DAG中导入本地模块(python脚本)

提问于
浏览
1

我正在尝试将本地模块(python脚本)导入我的DAG .

目录结构:

- airflow/  
-- dag/  
---- __init__.py  
---- my_DAG.py  
-- script/  
---- subfolder/  
------ __init__.py  
------ local_module.py

my_DAG.py中的示例代码:

#trying to import from local module
from script.subfolder import local_module  

#calling a function in local_module.py  
a = some_function()

我在Airflow中收到错误说'Broken DAG:my_DAG . 没有名为'local_module'的模块 .

我已将Airflow更新为1.9.0,但这并不能解决问题 .

  • 这里有什么解决方案?

  • 我还在某处读到了我可以通过创建插件来解决这个问题 . 任何人都可以指出我怎么做到这一点?

谢谢 .

3 回答

  • 0

    这通常与Airflow的配置方式有关 .

    airflow.cfg 中,确保 airflow_home 中的路径已正确设置为Airflow目录结构所在的路径 .

    然后Airflow扫描所有子文件夹并填充它们,以便找到模块 .

    否则,只需确保您尝试导入的文件夹位于Python路径中:How to use PYTHONPATH

  • 0

    您需要在 script 目录中添加另一个 __init__.py 文件,与 subfolder 处于同一级别 .

    airflow/  
        |_ dag/  
        |    |_ __init__.py  
        |    |_ my_DAG.py  
        |_ script/
             |_ __init__.py          <----- This here
             |_  subfolder/  
                  |_  __init__.py  
                  |_  local_module.py
    
  • 0

    我这样做的方式如下:

    • 使用main()函数在子文件夹中创建Python脚本 .
      dag文件中的
    • 包含子文件夹和文件的路径声明

    现在您可以在 PythonOperator 中使用此脚本

    import sys
    sys.path.insert(0,"/root/airflow/dags/subfolder"))
    import subfolder.script_name as script
    ...    
    t1=PythonOperator(
        task_id='python_script',
        python_callable=script.main,
        dag=dag
    )
    

相关问题