我正在尝试为Google Cloud Dataflow创建自己的模板,以便可以从GUI执行作业,从而使其他人更容易执行 . 我已经按照教程,创建了自己的PipelineOptions类,并使用parser.add_value_provider_argument()方法填充它 . 当我尝试将这些参数传递到管道中时,使用my_options.argname.get(),我收到一个错误,告诉我该项未从运行时上下文中调用 . 我不明白这一点 . args不是定义管道图本身的一部分,它们只是输入文件名,输出表名等参数 .
以下是代码 . 如果我硬编码输入文件名,输出表名,写处置和分隔符,它就可以工作 . 如果我用它们的my_options.argname.get()等效替换它们,它就会失败 . 在显示的代码片段中,我已经硬编码除outputBQTable名称之外的所有内容,其中我使用my_options.outputBQTable.get() . 此操作失败,并显示以下消息 .
apache_beam.error.RuntimeValueProviderError:RuntimeValueProvider(选项:outputBQTable,类型:str,default_value:'dataflow_csv_reader_testing.names') . get()未从运行时上下文中调用
我很欣赏任何有关如何使其发挥作用的指导 .
import apache_beam
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import csv
import argparse
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
parser.add_value_provider_argument('--inputGCS', type=str,
default='gs://mybucket/df-python-csv-test/test-dict.csv',
help='Input gcs csv file, full path and filename')
parser.add_value_provider_argument('--delimiter', type=str,
default=',',
help='Character used as delimiter in csv file, default is ,')
parser.add_value_provider_argument('--outputBQTable', type=str,
default='dataflow_csv_reader_testing.names',
help='Output BQ Dataset.Table to write to')
parser.add_value_provider_argument('--writeDisposition', type=str,
default='WRITE_APPEND',
help='BQ write disposition, WRITE_TRUNCATE or WRITE_APPEND or WRITE_EMPTY')
def main():
optlist=PipelineOptions()
my_options=optlist.view_as(MyOptions)
p = apache_beam.Pipeline(options=optlist)
(p
| 'create' >> apache_beam.Create(['gs://mybucket/df-python-csv-test/test-dict.csv'])
| 'read gcs csv dict' >> apache_beam.FlatMap(lambda file: csv.DictReader(apache_beam.io.gcp.gcsio.GcsIO().open(file,'r'), delimiter='|'))
| 'write bq record' >> apache_beam.io.Write(apache_beam.io.BigQuerySink(my_options.outputBQTable.get(), write_disposition='WRITE_TRUNCATE'))
)
p.run()
if __name__ == '__main__':
main()
2 回答
这是一个尚未为Python SDK开发的功能 .
相关的open issue可以在Apache Beam项目页面找到 .
在上述问题得到解决之前,现在的解决方法是使用Java SDK .
指定管道时,不能使用
my_options.outputBQTable.get()
. BigQuery接收器已经知道如何使用运行时提供的参数,所以我认为你可以传递my_options.outputBQTable
.根据我从文档中收集的内容,您只应在
DoFn
的process
方法中使用options.runtime_argument.get()
传递给ParDo
步骤 .注意:我使用2.8.0的Apache Beam SDK进行了测试,因此我使用了
WriteToBigQuery
而不是BigQuerySink
.