我有Airflow的问题,DAG中的第一个作业总是成功启动和结束,但第二个作业永远不会自动启动 .

我尝试在UI中清除该作业,但它没有启动,如果我想看到它正在运行,我需要删除数据库中正在运行的作业,

delete from job where state='running'

但我在运行状态下没有很多工作,我只有一个JoblerJob,最新的Heartbeat ok,以及16个外部任务传感器等待这个DAG

游泳池有150个插槽,有16个跑步和1个预定 .

  • 我正在运行气流调度程序

  • 我正在运行气流网络服务器

  • 所有DAG在Web ui中设置为On

  • 所有DAG都有一个过去的开始日期

  • 我在几小时前重置了调度程序

这是气流中的代码

default_args = {
  'owner': 'airgia',
  'depends_on_past': False,
  'retries': 2,
  'start_date': datetime(2018, 12, 1, 0, 0),
  'email': ['xxxx@yyyy.net'],
  'email_on_failure': False,
  'email_on_retry': False
}

dag = DAG('trigger_snapshot',
      default_args=default_args,
      dagrun_timeout= timedelta(hours=22),
      schedule_interval="0 0 * * 1,2,3,4,5,7",
      max_active_runs=1,
      catchup=False
   )

set_exec_dt = PythonOperator(
    task_id='set_exec_dt',
    python_callable=set_exec_dt_variable,
    dag=dag,
    pool='capser')

lanza_crawler = PythonOperator(
    task_id='lanza_crawler',
    op_kwargs={"crawler_name": crawler_name},
    python_callable=start_crawler,
    dag=dag,
    pool='capser')

copy_as_processed =  PythonOperator(
    task_id='copy_as_processed',
    op_kwargs={"source_bucket": Variable.get("bucket"),
           "source_key": snapshot_key,
           "dest_bucket": Variable.get("bucket"),
           "dest_key": "{0}_processed".format(snapshot_key)},
    python_callable=s3move,
    dag=dag,
    pool='capser')

airflow_snapshot = S3KeySensor(
    task_id='airflow_snapshot',
    bucket_key=snapshot_key,
    wildcard_match=True,
    bucket_name=Variable.get("bucket"),
    timeout=8*60*60,
    poke_interval=120,
    dag=dag,
    pool='capser')


Fin_DAG_TC = DummyOperator(
    task_id='Fin_DAG_TC',
    dag=dag,
pool='capser')


airflow_snapshot >> lanza_crawler >> set_exec_dt >> copy_as_processed >> Fin_DAG_TC

这就是我每天早上连接到web ui时看到的内容

operator null


[编辑]

这是调度程序的最后一个日志

在这里,我们可以看到第二个工作(lanza_crawler)的调用,但不是开始 .

[2018-12-11 03:50:54,209] {}信息 - 执行任务:[2018-12-11 03:50:54,240] {} INFO - DAG trigger_snapshot具有0/16运行和排队任务[2018-12-11 03:50:54,240] {}信息 - 将跟随任务设置为排队状态:[2018-12-11 03 :50:54,254] {}信息 - 将跟随任务设置为排队状态:[2018-12-11 03:50:54,255] {}信息 - 发送(' trigger_snapshot','lanza_crawler',datetime.datetime(2018,12,10,0,0,tzinfo =),1)执行者优先级为4,队列默认为[2018-12-11 03:50:54,255] {} INFO - 添加到队列:气流运行trigger_snapshot lanza_crawler 2018-12-10T00:00:00 00:00 --local --pool capser -sd / usr / local / airflow / dags / capser / trigger_snapshot . py [2018-12-11 03:50:54,262] {} INFO - [芹菜]排队('trigger_snapshot','lanza_crawler',datetime.datetime(2018,12,10,0,0) ,tzinfo =),1)通过celery,queue = default [2018-12-11 03:50:54,749] { } INFO - Executor报告trigger_snapshot.airflow_snapshot execution_date = 2018-12-10 00:00:00 00:00作为try_number 1的成功/usr/local/airflow/dags/capser/trigger_snapshot.py 1.53 s 2018-12-11T03:50:54 ... /usr/local/airflow/dags/capser/trigger_snapshot.py 6866 0.68s 1.54s 2018-12-11T03:56:50

这是 Worker 的最后一个日志

[2018-12-11 03:50:52,718:INFO / ForkPoolWorker-11]任务airflow.executors.celery_executor.execute_command [9a2e1ae7-9264-47d8-85ff-cac32a542708]成功于13847.525094523095s:无[2018-12-11 03:50:54,505:INFO / MainProcess]收到的任务:airflow.executors.celery_executor.execute_command [9ff70fc8-45ef-4751-b274-71e242553128] [2018-12-11 03:50:54,983] {{settings.py:174 INFO - setting.configure_orm():使用池设置 . pool_size = 5,pool_recycle = 1800 [2018-12-11 03:50:55,422] {}信息 - 使用执行者CeleryExecutor [2018-12-11 03:50:54,611] {{models . py:271}} INFO - 从/usr/local/airflow/dags/capser/DAG_AURORA/DAG_AURORA.py填充DagBag [2018-12-11 03:50:55,970] {} INFO - 在主机ip上运行---- * . eu-west-1.compute.internal