首页 文章

将数据从csv写入BigQuery时出错

提问于
浏览
0

我编写了一个Python数据流作业来从csv文件中读取数据,并使用该数据填充BigQuery表 . 但是,每当我运行此作业时,错误都会一直弹出 . 如果我删除写入Big Query部分并写入文件,则代码执行正常,表格将以dict格式写入输出文件 . 代码如下:

import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import json
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
class ToTableRowDoFn(beam.DoFn):
  def process(self,x):
    values = x.split(',')
    rows={}
    rows["Name"]=values[0]
    rows["Place_of_Birth"]=values[1]
    rows["Age"]=values[2]
    return [rows]
parser = argparse.ArgumentParser()
parser.add_argument('--input',
                  dest='input',
                  default='gs://dataflow-samples/shakespeare/kinglear.txt',
                  help='Input file to process.')
parser.add_argument('--output',
                  dest='output',   
                  help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.

lines = p | 'read' >> ReadFromText(known_args.input)
lines | 'ToTableRows' >> beam.ParDo(ToTableRowDoFn()) | 'write' >> 
beam.io.Write(beam.io.BigQuerySink(
  'xxxx:ZZZZZZ.YYYYY',
  schema='Name:STRING, Place_of_Birth:STRING, Age:STRING'))

# Actually run the pipeline (all operations above are deferred).
result = p.run()

我正在加载以下csv文件:

Name1,Place1,40
Name2,Place2,20

我在csv文件上运行此代码时得到的错误如下:

AttributeError: 'FieldList' object has no attribute '_FieldList__field'

如果我删除WritetoBigQuery部分并写入文件,代码工作正常 . 请帮我解决这个问题 .

1 回答

相关问题