首页 文章

气流DAG可防止其他DAGS启动

提问于
浏览
0

我们正在运行Airflow v1.9.0并且遇到一个DAG问题 . 这个SSHOperator运算符DAG(称为匹配)从0 6 * * *开始,通常在一小时内完成 . 每月一次,我们有大量数据提取,导致此任务需要7个小时 . 不幸的是,当发生这种情况时,DAG会阻止我们的其他DAGS启动直到完成 . 它是这7小时内唯一运行的DAG . 这不是正常行为或我们的其他DAGS(它们继续运行,其他DAGS将启动) . 我们找不到任何可能导致此问题的表锁(PostgreSQL) .

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta
from os import path

PATH = '/path/to/code/'

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2018, 1, 24),
    'email': ['myemail@emails.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    }

with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
    nightly_matching_command = f.read()

dag = DAG('matching',
          template_searchpath = PATH,
          schedule_interval = '0 6 * * *', #10:00PM
          default_args = default_args)

nightly_matching = SSHOperator(task_id = 'nightly_matching',
                               ssh_conn_id = 'external_server_name',
                               command = nightly_matching_command,
                               do_xcom_push = True,
                               dag = dag)

以下是查看气流数据库时的查询结果 .

airflow=# select task_id, dag_id, start_date, execution_date,end_date, duration, state, pool, queue from task_instance where date_trunc('day', start_date)::DATE = '2018-05-09'::DATE order by start_date;

task_id: nightly_matching dag_id: matching start_date: 2018-05-09 06:00:04.486709 execution_date: 2018-05-08 06:00:00 end_date: 2018-05-09 12:50:52.509942 duration: 24648.023233 state: success pool: queue: default

紧接着问题DAG的DAG计划在10:40开始 .

task_id: Task1 dag_id: dag2 start_date: 2018-05-09 12:51:04.963004 execution_date: 2018-05-08 10:40:00 end_date: 2018-05-09 12:51:07.060369 duration: 2.097365 state: success pool: queue: default

run_nightly_matching.sh运行一个Python脚本,该脚本使用psycopg2连接到我们的数据库并匹配表对 .

*** Reading local log.
[2018-05-09 06:00:04,352] {cli.py:374} INFO - Running on host airflow
[2018-05-09 06:00:04,486] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,587] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,588] {models.py:1407} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2018-05-09 06:00:04,617] {models.py:1428} INFO - Executing <Task(SSHOperator): nightly_matching> on 2018-05-08 06:00:00
[2018-05-09 06:00:04,617] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run matching nightly_matching 2018-05-08T06:00:00 --job_id 274416 --raw -sd DAGS_FOLDER/matching.py']
[2018-05-09 06:00:04,981] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:04,980] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-05-09 06:00:05,023] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,022] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-05-09 06:00:05,140] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,139] {__init__.py:45} INFO - Using executor LocalExecutor
[2018-05-09 06:00:05,190] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,190] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/matching.py
[2018-05-09 06:00:05,445] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,444] {base_hook.py:80} INFO - Using connection to: external_server_name
[2018-05-09 06:00:05,511] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,509] {transport.py:1687} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2018-05-09 06:00:05,621] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,620] {transport.py:1687} INFO - Authentication (publickey) successful!
[2018-05-09 12:50:52,431] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 12:50:52,430] {ssh_operator.py:113} INFO - Matching: ('table_Z', 'table_Y')
[2018-05-09 12:50:52,432] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:00:58.172398
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-08 23:17:19.273990
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_R')
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:17:19.274092
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 03:16:46.339119
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'table_Y')
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 03:16:46.339228
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 04:49:16.901285
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'clients')
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 04:49:16.901410
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:30.502418
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('clients', 'table_R')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:30.502494
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:47.035880
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_B')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:47.035974
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:03:17.464931
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_E', 'table_Y')
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:03:17.465061
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:43:05.543177
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_P', 'table_Y')
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:43:05.543298
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:51:42.683216

airflow.cfg (via ansible)

1 回答

  • 1

    我认为这可以与执行任务的数量相关联,并与此位相关:

    with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
        nightly_matching_command = f.read()
    

    如果我没有弄错的话,这个代码位可以比计划更频繁地执行,因为它不是任务的一部分而且是不可触发的 . 每次执行Python文件本身时都会执行此操作 . 根据您的设置,这可能永远不会发生或经常发生 . 由于打开了文件句柄,因此可能与问题有关 . 这可能只是值得检查 .

    此外,我无法查看cfg文件,因为它存储在Dropbox上,我必须登录 .

    这里并发允许的线程数量可能会变得有趣 . 听起来像一个阻塞的问题 . 也可能是DAG本身没有停止其他任务,但负载上升到足以消耗太多负载 . 没有具体细节很难说 .

相关问题