气流版本:1.9.0
在气流dag文件中,我有一个名为 run_query
的 PythonOperator
任务,它在python_callable函数中设置以下xcom变量:
kwargs['ti'].xcom_push(key='query_result_loc', value=query_result_loc)
后来在同一个dag中,我有一个 S3KeySensor
任务,它使用上面的位置作为其bucket_key参数:
S3KeySensor(task_id = 'check_file_in_s3',
bucket_key = '{{ ti.xcom_pull(task_ids="run_query",key="query_result_loc") }}' ,
bucket_name = None,
wildcard_match = False,
poke_interval=60,
timeout=1200,
dag = dag
)
现在,当我运行dag(在测试模式或trigger_dag模式下)时,S3KeySensor会抱怨缺少bucket_name,它来自S3KeySensor definition中的此代码:
class S3KeySensor(BaseSensorOperator):
"""
Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
S3 being a key/value it does not support folders. The path is just a key
a resource.
:param bucket_key: The key being waited on. Supports full s3:// style url
or relative path from root level.
:type bucket_key: str
:param bucket_name: Name of the S3 bucket
:type bucket_name: str
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
:type wildcard_match: bool
:param aws_conn_id: a reference to the s3 connection
:type aws_conn_id: str
"""
template_fields = ('bucket_key', 'bucket_name')
@apply_defaults
def __init__(
self, bucket_key,
bucket_name=None,
wildcard_match=False,
aws_conn_id='aws_default',
*args, **kwargs):
super(S3KeySensor, self).__init__(*args, **kwargs)
# Parse
if bucket_name is None:
parsed_url = urlparse(bucket_key)
if parsed_url.netloc == '':
raise AirflowException('Please provide a bucket_name')
else:
bucket_name = parsed_url.netloc
if parsed_url.path[0] == '/':
bucket_key = parsed_url.path[1:]
else:
bucket_key = parsed_url.path
self.bucket_name = bucket_name
self.bucket_key = bucket_key
看起来模板在此阶段没有渲染 .
如果我注释掉if,它可以正常工作 . 这是一个错误还是模板字段的错误用法?
根据@ kaxil的评论更新:
-
由于未提供bucket_name且'if'块未被注释,气流甚至无法检测到dag . 在UI上,我看到这个错误:
Broken DAG: [/XXXX/YYYY/project_airflow.py] Please provide provide a bucket_name
-
由于未提供bucket_name,但对'if'块进行了以下修改(参见删除了
if parsed_url.netloc == ''
检查),它可以正常工作:
if bucket_name is None:
parsed_url = urlparse(bucket_key)
bucket_name = parsed_url.netloc
if parsed_url.path[0] == '/':
bucket_key = parsed_url.path[1:]
else:
bucket_key = parsed_url.path
- 提供了bucket_name,它可以在
Rendered Template
选项卡下的bucket_key
和bucket_name
的渲染值下正常工作 .
1 回答
你've probably worked this out by now but it'因为你没有为task_ids提供数组 . 格式应为
task_ids=["run_query"]
. 将其改为此可以解决我的问题 .