我有一个PCollection [String]说“X”我需要在BigQuery表中转储 . 表目的地和它的模式在PCollection [TableRow]中说“Y” . 如何以最简单的方式实现这一目标?
我尝试从“Y”中提取表和模式,并将其保存在静态全局变量(分别为tableName和schema)中 . 但奇怪的是,BigQueryIO.writeTableRows()总是将变量tableName的值变为null . 但它得到了架构 . 我尝试记录这些变量的值,我可以看到两者的值都存在 .
这是我的管道代码:
static String tableName;
static TableSchema schema;
PCollection<String> read = p.apply("Read from input file",
TextIO.read().from(options.getInputFile()));
PCollection<TableRow> tableRows = p.apply(
BigQueryIO.read().fromQuery(NestedValueProvider.of(
options.getfilename(),
new SerializableFunction<String, String>() {
@Override
public String apply(String filename) {
return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'";
}
})).usingStandardSql().withoutValidation());
final PCollectionView<List<String>> dataView = read.apply(View.asList());
tableRows.apply("Convert data read from file to TableRow",
ParDo.of(new DoFn<TableRow,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) {
tableName = c.element().get("table").toString();
String[] schemas = c.element().get("schema").toString().split(",");
List<TableFieldSchema> fields = new ArrayList<>();
for(int i=0;i<schemas.length;i++) {
fields.add(new TableFieldSchema()
.setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
}
schema = new TableSchema().setFields(fields);
//My code to convert data to TableRow format.
}}).withSideInputs(dataView));
tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to("ProjectID:DatasetID."+tableName)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
一切正常 . 只有BigQueryIO.write操作失败,我得到错误TableId为null .
我也尝试使用SerializableFunction并从那里返回值,但我仍然得到null .
这是我为它尝试的代码:
tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to(new GetTable(tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
public static class GetTable implements SerializableFunction<String,String> {
String table;
public GetTable() {
this.table = tableName;
}
@Override
public String apply(String arg0) {
return "ProjectId:DatasetId."+table;
}
}
我也尝试使用DynamicDestinations但是我收到错误,说没有提供架构 . 老实说,我是DynamicDestinations概念的新手,我不确定我是否正确地做到了 .
这是我为它尝试的代码:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination getTable(TableRow dest) {
List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema
String table = list.get(0).get("table").toString();
String tableSpec = "ProjectId:DatasetId."+table;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
return null;
}
@Override
public TableSchema getSchema(TableRow destination) {
return schema; //schema is getting added from the global variable
}
@Override
public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
return null;
}
}.getSideInputs(bqDataView)));
请让我知道我做错了什么,我应该走哪条路 .
谢谢 .
1 回答
您遇到麻烦的部分原因是由于管道执行的两个阶段 . 首先,管道在您的机器上构建 . 这是PTransforms的所有应用程序发生时 . 在第一个示例中,这是执行以下行的时间:
然而,ParDo中的代码在管道执行时运行,并且在许多机器上执行 . 因此,以下代码的运行时间远晚于管道构造:
这意味着在创建BigQueryIO接收器时,不会设置tableName和schema字段 .
您使用DynamicDestinations的想法是正确的,但您需要移动代码以实际生成该类的目标模式,而不是依赖于所有机器上都不可用的全局变量 .