首页 文章

气流中的Python脚本调度

提问于
浏览
4

嗨,大家好,

我需要使用气流安排我的python files(which contains data extraction from sql and some joins) . 我已经成功地将气流安装到我的linux服务器中,我可以使用气流网络服务器 . 但即使在完成文件后我也不清楚 where exactly I need to write script for scheduling and how will that script be available into airflow webserver so I could see the status

就配置而言,我知道dag文件夹在我的主目录中的位置以及示例dags所在的位置 .

Note: 请不要将此标记为重复,如何在Airflow中运行bash脚本文件,因为我需要运行位于不同位置的python文件 .

请在Airflow网络服务器中找到以下配置:

enter image description here

以下是AIRFLOW_HOME目录中dag文件夹的屏幕截图

enter image description here

还可以在下面找到DAG创建屏幕截图和Missing DAG错误的屏幕截图

enter image description here

enter image description here

在我选择简单DAG之后,填充了丢失DAG的错误

enter image description here

4 回答

  • 6

    您应该使用 PythonOperator 来调用您的函数 . 如果您想在其他地方定义该功能,只需在 PYTHONPATH 中访问该模块即可从模块中导入 .

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    
    from my_script import my_python_function
    
    dag = DAG('tutorial', default_args=default_args)
    
    PythonOperator(dag=dag,
                   task_id='my_task_powered_by_python',
                   provide_context=False,
                   python_callable=my_python_function,
                   op_args=['arguments_passed_to_callable'],
                   op_kwargs={'keyword_argument':'which will be passed to function'})
    

    如果您的函数 my_python_function 在脚本文件中 /path/to/my/scripts/dir/my_script.py

    然后在启动Airflow之前,您可以将脚本的路径添加到 PYTHONPATH ,如下所示:

    export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH
    

    更多信息:https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

    默认args和教程中的其他注意事项:https://airflow.incubator.apache.org/tutorial.html

  • 0

    您还可以使用bashoperator在Airflow中执行python脚本 . 您可以将脚本放在DAG文件夹中的文件夹中 . 如果您的脚本位于其他位置,只需提供这些脚本的路径即可 .

    from airflow import DAG
        from airflow.operators import BashOperator,PythonOperator
        from datetime import datetime, timedelta
    
        seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                          datetime.min.time())
    
        default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': seven_days_ago,
            'email': ['airflow@airflow.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
          )
    
        dag = DAG('simple', default_args=default_args)
    t1 = BashOperator(
        task_id='testairflow',
        bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
        dag=dag)
    
  • 4

    Airflow解析$ AIRFLOW_HOME / dags中的所有Python文件(在你的情况下为/ home / amit / airflow / dags) . 并且该python脚本应该返回一个DAG对象,如“postrational”的回答所示 . 现在当它被报告为缺失时,这意味着Python代码中存在一些问题,并且Airflow无法加载它 . 检查气流网络服务器或调度程序日志以获取更多详细信息,因为stderr或stdout会在那里 .

  • 1

    airflow webserver -p <port>

    它自动加载示例dags,可以在$ HOME / airflow / airflow.cfg中禁用

    `load_examples = False`
    
    • 在$ HOME / airflow /中创建 dags 文件夹,将tutorial.py文件放在 dags 文件夹中https://airflow.incubator.apache.org/tutorial.html

    • 做一些实验,在tutorial.py中进行更改 . 如果您将schedule_interval作为cron语法,那么 'start_date' : datetime(2017, 7, 7)

    'start_date': datetime.now()
    

    dag = DAG('tutorial', default_args=default_args,schedule_interval="@once")dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *") # schedule each minute

    • 启动气流: $ airflow webserver -p <port>

    • 启动调度程序: $ airflow scheduler

相关问题