首页 文章

如何在气流变量上强制使用jinja模板?

提问于
浏览
2

airflow docs说: You can use Jinja templating with every parameter that is marked as “templated” in the documentation . 有意义的是,气流世界中的特定参数(例如 PythonOperator 中的某些参数)会自动通过气流进行模拟 . 我想知道最好/正确的方法是获得一个非气流变量来模板化 . 我的具体用例类似于:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from somewhere import export_votes_data, export_queries_data
from elsewhere import ApiCaucus, ApiQueries

dag = DAG('export_training_data', description='Export training data for all active orgs to GCS', schedule_interval=None,
          start_date=datetime(2018, 3, 26), catchup=False)

HOST = "http://api-00a.dev0.solvvy.co"
BUCKET = "gcs://my-bucket-name/{{ ds }}/" # I'd like this to get templated

votes_api = ApiCaucus.get_votes_api(HOST)
queries_api = ApiQueries.get_queries_api(HOST)

export_votes = PythonOperator(task_id="export_votes", python_callable=export_votes_data,
                                  op_args=[BUCKET, votes_api], dag=dag)
export_queries = PythonOperator(task_id="export_queries", python_callable=export_query_data,
                                    op_args=[BUCKET, queries_api, export_solutions.task_id], dag=dag,
                                    provide_context=True)

2 回答

  • 0

    PythonOperatorprovide_context 参数将传递用于模板化的参数 . 来自the documentation

    provide_context(bool) - 如果设置为true,Airflow将传递一组可在函数中使用的关键字参数 . 这组kwargs完全对应于你在jinja模板中可以使用的内容 . 为此,您需要在函数头中定义** kwargs .

    通过为您的callable提供上下文,您可以在函数中进行插值:

    def your_callable(bucket, api, **kwargs):
      bucket = bucket.format(**kwargs)
      [...]
    
  • 1

    运算符的内部方法(执行/ pre_execute / post_execute,以及任何可以获得Airflow context 的内容):

    BUCKET = "gcs://my-bucket-name/{{ ds }}/" # I'd like this to get templated
    
    jinja_context = context['ti'].get_template_context()
    rendered_content = self.render_template('', BUCKET, jinja_context)
    

相关问题