首页 文章

在Apache Airflow中的S3KeySensor中模板化`bucket_key`

提问于
浏览
0

气流版本:1.9.0

在气流dag文件中,我有一个名为 run_queryPythonOperator 任务,它在python_callable函数中设置以下xcom变量:

kwargs['ti'].xcom_push(key='query_result_loc', value=query_result_loc)

后来在同一个dag中,我有一个 S3KeySensor 任务,它使用上面的位置作为其bucket_key参数:

S3KeySensor(task_id = 'check_file_in_s3',
                 bucket_key = '{{  ti.xcom_pull(task_ids="run_query",key="query_result_loc")  }}' ,
                 bucket_name = None,
                 wildcard_match = False,
                 poke_interval=60,
                 timeout=1200,
                 dag = dag
                 )

现在,当我运行dag(在测试模式或trigger_dag模式下)时,S3KeySensor会抱怨缺少bucket_name,它来自S3KeySensor definition中的此代码:

class S3KeySensor(BaseSensorOperator):
    """
    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
    S3 being a key/value it does not support folders. The path is just a key
    a resource.

    :param bucket_key: The key being waited on. Supports full s3:// style url
        or relative path from root level.
    :type bucket_key: str
    :param bucket_name: Name of the S3 bucket
    :type bucket_name: str
    :param wildcard_match: whether the bucket_key should be interpreted as a
        Unix wildcard pattern
    :type wildcard_match: bool
    :param aws_conn_id: a reference to the s3 connection
    :type aws_conn_id: str
    """
    template_fields = ('bucket_key', 'bucket_name')

    @apply_defaults
    def __init__(
                self, bucket_key,
                bucket_name=None,
                wildcard_match=False,
                aws_conn_id='aws_default',
                *args, **kwargs):
            super(S3KeySensor, self).__init__(*args, **kwargs)
            # Parse
            if bucket_name is None:
                parsed_url = urlparse(bucket_key)
                if parsed_url.netloc == '':
                    raise AirflowException('Please provide a bucket_name')
                else:
                    bucket_name = parsed_url.netloc
                    if parsed_url.path[0] == '/':
                        bucket_key = parsed_url.path[1:]
                    else:
                        bucket_key = parsed_url.path
            self.bucket_name = bucket_name
            self.bucket_key = bucket_key

看起来模板在此阶段没有渲染 .

如果我注释掉if,它可以正常工作 . 这是一个错误还是模板字段的错误用法?

根据@ kaxil的评论更新:

  • 由于未提供bucket_name且'if'块未被注释,气流甚至无法检测到dag . 在UI上,我看到这个错误: Broken DAG: [/XXXX/YYYY/project_airflow.py] Please provide provide a bucket_name

  • 由于未提供bucket_name,但对'if'块进行了以下修改(参见删除了 if parsed_url.netloc == '' 检查),它可以正常工作:

if bucket_name is None:
    parsed_url = urlparse(bucket_key)
    bucket_name = parsed_url.netloc
    if parsed_url.path[0] == '/':
        bucket_key = parsed_url.path[1:]
    else:
        bucket_key = parsed_url.path
  • 提供了bucket_name,它可以在 Rendered Template 选项卡下的 bucket_keybucket_name 的渲染值下正常工作 .

1 回答

  • 0

    你've probably worked this out by now but it'因为你没有为task_ids提供数组 . 格式应为 task_ids=["run_query"] . 将其改为此可以解决我的问题 .

相关问题