首页 文章

气流mysql到gcp Dag错误

提问于
浏览
0

我最近开始使用Airflow . 我正在研究DAG:

  • 查询MySQL数据库

  • 解压缩查询并将其作为JSON文件存储在 Cloud 存储桶中

  • 将存储的JSON文件上传到BigQuery

Dag进口三个运营商: MySqlOperatorMySqlToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator

我使用的是Airflow 1.8.0,Python 3和Pandas 0.19.0 .

这是我的Dag代码:

sql2gcp_csv = MySqlToGoogleCloudStorageOperator(

    task_id='sql2gcp_csv',
    sql='airflow_gcp/aws_sql_extract_7days.sql',
    bucket='gs://{{var.value.gcs_bucket}}/{{ ds_nodash }}/',
    filename='{{ ds_nodash }}-account-*.json',
    schema_filename='support/file.json',
    approx_max_file_size_bytes=1900000000,
    mysql_conn_id='aws_mysql',
    google_cloud_storage_conn_id='airflow_gcp',

)

但是,当我运行它时,我收到以下错误:

[2017-07-20 22:38:07,478] {models.py:1441} INFO - Marking task as FAILED. 

[2017-07-20 22:38:07,490] {models.py:1462} ERROR - a bytes-like object is required, not 'str'

/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'database': 'test'}
category=PendingDeprecationWarning

/home/User/airflow/workspace/env/lib/python3.5/site-
packages/airflow/ti_deps/deps/base_ti_dep.py:94: PendingDeprecationWarning: generator '_get_dep_statuses' raised StopIteration

for dep_status in self._get_dep_statuses(ti, session, dep_context):
Traceback (most recent call last):

File "/home/User/airflow/workspace/env/bin/airflow", line 28, in <module> args.func(args)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/bin/cli.py", line 422, in run pool=args.pool,

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files 
json.dump(row_dict, tmp_file_handle)

File "/usr/lib/python3.5/json/__init__.py", line 179, in dump 

TypeError: a bytes-like object is required, not 'str'

有谁知道为什么抛出这个异常?

1 回答

  • 1

    根据你的追溯,你的代码在this point处破裂 . 如您所见,它处理代码:

    json.dump(row_dict, tmp_file_handle)
    

    tmp_file_handle 是带有默认输入参数的 NamedTemporaryFile initialized,也就是说,它模拟用 w+b 模式打开的文件(因此只接受字节数据作为输入) .

    问题是在Python 2中,所有字符串都是字节,而在Python 3中,字符串是文本(默认编码为 utf-8 ) .

    如果您打开Python 2并运行此代码:

    In [1]: from tempfile import NamedTemporaryFile
    In [2]: tmp_f = NamedTemporaryFile(delete=True)
    In [3]: import json
    In [4]: json.dump({'1': 1}, tmp_f)
    

    它工作正常 .

    但是如果你打开一个Python 3并运行相同的代码:

    In [54]: from tempfile import NamedTemporaryFile
    In [55]: tmp_f = NamedTemporaryFile(delete=True)
    In [56]: import json
    In [57]: json.dump({'1': 1}, tmp_f)
    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-57-81743b9013c4> in <module>()
    ----> 1 json.dump({'1': 1}, tmp_f)
    
    /usr/local/lib/python3.6/json/__init__.py in dump(obj, fp, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
        178     # a debuggability cost
        179     for chunk in iterable:
    --> 180         fp.write(chunk)
        181 
        182 
    
    /usr/local/lib/python3.6/tempfile.py in func_wrapper(*args, **kwargs)
        481             @_functools.wraps(func)
        482             def func_wrapper(*args, **kwargs):
    --> 483                 return func(*args, **kwargs)
        484             # Avoid closing the file as long as the wrapper is alive,
        485             # see issue #18879.
    
    TypeError: a bytes-like object is required, not 'str'
    

    我们得到与您相同的错误 .

    这意味着Python 3仍然不完全支持Airflow(正如您在test coverage中看到的那样,模块 airflow/contrib/operators/mysql_to_gcs.py 尚未在python 2或3中进行测试) . 确认这一点的一种方法是使用python 2运行代码并查看它是否有效 .

    我建议在their JIRA上创建一个问题,请求两个版本的Python的可移植性 .

相关问题