首页 文章

带有MySQL到Cloud Storage Bucket Airflow DAG的UnicodeDecodeError

提问于
浏览
0

我创建了一个DAG,它从数据库中提取MySQL数据并将其加载到 Cloud 存储,然后将BigQuery作为json文件加载 .

DAG适用于某些表但不是全部,因为它无法解码表中的某些字符 . 这是相当多的数据,所以我无法准确指出错误或无效字符的位置 .

我已经尝试将我的数据库,表格和列字符集从utf8更改为utf8mb4 . 这没有用 .

我也试过调用encoding ='utf-8'以及'iso-8859-1',但我怎么认为我没有正确地调用它们,因为我一直在用我的连接做这个,我仍然得到了同样的错误 .

我正在运行Python 2.7.12和airflow v1.8.0

更新:阅读之后:https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls建议使用定义charset的连接字符串,例如:sql_alchemy_conn = mysql:// airflow @ localhost:3306 / airflow?charset = utf8

如何使用Cloud SQL实例完成此操作?

podio_connections = [
    'mysql_connection'
]

podio_tables = [
     'finance_banking_details',
     'finance_goods_invoices',
]

default_args = {
    'owner': 'xxxxxx',
    'start_date': datetime(2018,1,11),
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')

slack_notify = SlackAPIPostOperator(
    task_id='slack_notify',
    token='xxxxxx',
    channel='data-status',
    username='airflow',
    text='Successfully performed Podio ETL operation',
    dag=dag)

for connection in podio_connections:
    for table in podio_tables:
        extract = MySqlToGoogleCloudStorageOperator(
            task_id="extract_mysql_%s_%s"%(connection,table),
            mysql_conn_id=connection,
            google_cloud_storage_conn_id='gcp_connection',
            sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
            bucket='podio-reader-storage',
            filename="%s/%s/%s{}.json"%(connection,table,table),            
            schema_filename="%s/schemas/%s.json"%(connection,table),
        dag=dag)

        load = GoogleCloudStorageToBigQueryOperator(
            task_id="load_bg_%s_%s"%(connection,table),
            bigquery_conn_id='gcp_connection',
            google_cloud_storage_conn_id='gcp_connection',
            bucket='podio-reader-storage',
            #destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
            destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
            source_objects=["%s/%s/%s*.json"%(connection,table,table)],
            schema_object="%s/schemas/%s.json"%(connection,table),
            source_format='NEWLINE_DELIMITED_JSON',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)

        load.set_upstream(extract)
        slack_notify.set_upstream(load)

[2018-01-12 15:36:10,221] 错误 - 'utf8'编解码器无法解码位置36的字节0x96:无效的启动字节回溯(最近一次调用最后一次):文件“/ usr / local / lib / python2.7 / dist-packages / airflow / models.py“,第1374行,运行结果= task_copy.execute(context = context)文件”/usr/local/lib/python2.7/dist -packages / airflow / contrib / operators / mysql_to_gcs.py“,第91行,执行files_to_upload = self._write_local_data_files(cursor)文件”/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/ mysql_to_gcs.py“,第136行,在_write_local_data_files中json.dump(row_dict,tmp_file_handle)文件”/usr/lib/python2.7/json/init.py“,第189行,在for dump for dump for iterable中:文件”/ usr /lib/python2.7/json/encoder.py“,第434行,iterencode中的块_iterencode_dict(o, current_indent_level):文件”/usr/lib/python2.7/json/encoder.py“,第390行, in _iterencode_dict yield _encoder(value)UnicodeDecodeError:'utf8'编解码器无法解码36位的字节0x96:无效的启动字节

1 回答

  • 2

    96 是"en-dash"的latin1十六进制 . 要么将数据更改为utf8,要么更改与MySQL的连接以表示您正在使用charset latin1 .

相关问题