airflow docs说: You can use Jinja templating with every parameter that is marked as “templated” in the documentation
. 有意义的是,气流世界中的特定参数(例如 PythonOperator
中的某些参数)会自动通过气流进行模拟 . 我想知道最好/正确的方法是获得一个非气流变量来模板化 . 我的具体用例类似于:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from somewhere import export_votes_data, export_queries_data
from elsewhere import ApiCaucus, ApiQueries
dag = DAG('export_training_data', description='Export training data for all active orgs to GCS', schedule_interval=None,
start_date=datetime(2018, 3, 26), catchup=False)
HOST = "http://api-00a.dev0.solvvy.co"
BUCKET = "gcs://my-bucket-name/{{ ds }}/" # I'd like this to get templated
votes_api = ApiCaucus.get_votes_api(HOST)
queries_api = ApiQueries.get_queries_api(HOST)
export_votes = PythonOperator(task_id="export_votes", python_callable=export_votes_data,
op_args=[BUCKET, votes_api], dag=dag)
export_queries = PythonOperator(task_id="export_queries", python_callable=export_query_data,
op_args=[BUCKET, queries_api, export_solutions.task_id], dag=dag,
provide_context=True)
2 回答
PythonOperator
的provide_context
参数将传递用于模板化的参数 . 来自the documentation:通过为您的callable提供上下文,您可以在函数中进行插值:
运算符的内部方法(执行/ pre_execute / post_execute,以及任何可以获得Airflow
context
的内容):