首页 文章

气流DAG未按计划进行

提问于
浏览
0

我是Airflow的新手并创建了我的第一个DAG . 这是我的DAG代码 . 我希望DAG现在开始,然后一天运行一次 .

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['aaaa@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'alamode', default_args=default_args, schedule_interval=timedelta(1))

create_command = "/home/ubuntu/scripts/makedir.sh "

# t1 is the task which will invoke the directory creation shell script
t1 = BashOperator(
    task_id='create_directory',
    bash_command=create_command,
    dag=dag)

run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh "
# t2 is the task which will invoke the spiders
t2 = BashOperator(
    task_id='web_scrawl',
    bash_command=run_spiders,
    dag=dag)

# To set dependency between tasks. 't1' should run before t2
t2.set_upstream(t1)

DAG没有被Airflow选中 . 我检查了日志,这就是它的内容 .

[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue
[2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode
/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat
[2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105
[2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds

我究竟做错了什么?我已经尝试将schedule_interval更改为schedule_interval = timedelta(minutes = 1)以查看它是否立即启动,但仍然没有用 . 我可以在Airflow UI中看到DAG下的任务,但计划状态为“无状态” . 请帮帮我 .

1 回答

  • 0

    通过以下步骤解决了此问题:

    1)我使用了更早的start_date日期和schedule_interval = timedelta(分钟= 10) . 此外,使用实际日期而不是datetime.now() .
    2)在DAG参数中添加了catchup = True .
    3)设置环境变量为export AIRFLOW_HOME = pwd / airflow_home .
    4)删除了airflow.db
    5)将新代码移动到DAGS文件夹
    6)执行命令'airflow initdb'再次创建DB .
    7)通过UI打开我的DAG的'ON'开关
    8)执行命令'airflow scheduler'

    这是现在有效的代码:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2017, 9, 12),
        'email': ['anjana@gapro.tech'],
        'retries': 0,
        'retry_delay': timedelta(minutes=15)
    }
    
    dag = DAG(
        'alamode', catchup=False, default_args=default_args, schedule_interval="@daily")
    
    # t1 is the task which will invoke the directory creation shell script
    t1 = BashOperator(
        task_id='create_directory',
        bash_command='/home/ubuntu/scripts/makedir.sh ',
        dag=dag)
    
    
    # t2 is the task which will invoke the spiders
    t2 = BashOperator(
        task_id= 'web_crawl',
        bash_command='/home/ubuntu/scripts/crawl_spiders.sh ',
        dag=dag)
    
    # To set dependency between tasks. 't1' should run before t2
    t2.set_upstream(t1)
    

相关问题