首页 文章

Airflow S3KeySensor - 如何让它继续运行

提问于
浏览
4

this Stackoverflow post的帮助下,我刚刚制作了一个程序(帖子中显示的程序),当一个文件放在S3存储桶中时,我的一个正在运行的DAG中的任务被触发,然后我使用BashOperator执行一些工作 . 一旦它始终在运行,并且只要在S3存储桶中放置新文件,程序就会启动任务.'s done though the DAG is no longer in a running state but instead goes into a success state and if I want to have it pick up another file I need to clear all the '过去', '上游', '下游' activity. I would like to make this program so that it'始终在运行 .

我可以继续使用S3KeySenor来执行此操作,还是需要找到设置External Trigger以运行DAG的方法?截至目前,我的S3KeySensor如果只运行一次就毫无意义 .

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)

我是一个有向无环图,但它会有一个重复 sensor -> t1 -> t2 -> sensor -> t1 -> t2 -> sensor -> ... keep repeating 的循环 .

Update:

我的用例非常简单,只要在指定的AWS S3 Bucket中放置新文件,我希望触发我的DAG并开始执行各种任务 . 这些任务将执行以下操作:实例化新的AWS EMR集群,从AWS S3存储桶中提取文件,执行某些AWS EMR活动,然后关闭AWS EMR集群 . 从那里开始,DAG将返回等待状态,等待新文件到达AWS S3 Bucket,然后无限期地重复该过程 .

1 回答

相关问题