我编写了一个Python数据流作业来从csv文件中读取数据,并使用该数据填充BigQuery表 . 但是,每当我运行此作业时,错误都会一直弹出 . 如果我删除写入Big Query部分并写入文件,则代码执行正常,表格将以dict格式写入输出文件 . 代码如下:
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import json
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
class ToTableRowDoFn(beam.DoFn):
def process(self,x):
values = x.split(',')
rows={}
rows["Name"]=values[0]
rows["Place_of_Birth"]=values[1]
rows["Age"]=values[2]
return [rows]
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(known_args.input)
lines | 'ToTableRows' >> beam.ParDo(ToTableRowDoFn()) | 'write' >>
beam.io.Write(beam.io.BigQuerySink(
'xxxx:ZZZZZZ.YYYYY',
schema='Name:STRING, Place_of_Birth:STRING, Age:STRING'))
# Actually run the pipeline (all operations above are deferred).
result = p.run()
我正在加载以下csv文件:
Name1,Place1,40
Name2,Place2,20
我在csv文件上运行此代码时得到的错误如下:
AttributeError: 'FieldList' object has no attribute '_FieldList__field'
如果我删除WritetoBigQuery部分并写入文件,代码工作正常 . 请帮我解决这个问题 .
1 回答
我有同样的问题 . 发布此线程上发生的其他人 . 它与酸洗有关 . 您必须禁用save_main_session选项 . 我刚刚在我的管道选项中评论过测试 . https://issues.apache.org/jira/browse/BEAM-3134
https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors