首页 文章

从Dataflow在BigQuery中插入数据

提问于
浏览
0

以前,有过PCollection的格式化结果;我正在使用下面的代码在大查询中插入行:

// OPTION 1
PCollection<TableRow> formattedResults = ....
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName)
                            .withSchema(tableSchema)
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

所有行都直接插入BigQuery,直到这里都很好 . 但是现在我已经开始动态识别表名及其行,所以我创建了PCollection,如下所示:( String将是表名,然后是行作为值)

PCollection<KV<String, TableRow>>   tableRowMap // OPTION 2

此外,我正在创建一组行,这些行将在同一个表中:

PCollection<KV<String, Iterable<TableRow>>> groupedRows  //OPTION 3

其中key(String)是BQ表名,value是要在BQ中插入的行列表 .

使用选项1,我可以使用上面显示的代码轻松地在BQ中插入行,但是相同的代码不能与OPTION 2或OPTION 3一起使用,因为在这种情况下我的表名是map中的键 . 有没有办法使用OPTION 2或OPTION 3在表中插入行 . 任何链接或代码示例都将是很有帮助的 .

2 回答

  • -1

    Dataflow最接近每个窗口写一个表(并且您可以创建自己的BoundedWindow子类和WindowFn以包含您在窗口中需要的任何数据) . 为此,请使用

    to(SerializableFunction<BoundedWindow,String> tableSpecFunction)
    

    在BigQueryIO.Write上 .

    请注意,此功能使用BigQuery的流式上传功能,每个表限制为100MB / s . 此外,上载不是原子的,因此失败的批处理作业可能只上传部分输出 .

  • 1

    你've also got the option to create you'拥有直接将数据插入bigquery的DoFn,而不是依赖于BigQueryIO.Write . 从技术上讲,您需要创建一个 BigQueryTableInserter ,您可以使用 insertAll(TableReference ref, List<TableRow> rowList) 将内容插入到您想要的表中 .

    您可以使用以下内容创建TableReference: new TableReference().setProjectId("projectfoo").setDatasetId("datasetfoo").setTableId("tablefoo")

    这不是100%推荐,因为BigQueryIO做了一些很好的事情来分割需要插入的行以最大化吞吐量并正确处理重试 .

相关问题