首页 文章

Apache Beam中的DymanicDestinations

提问于
浏览
0

我有一个PCollection [String]说“X”我需要在BigQuery表中转储 . 表目的地和它的模式在PCollection [TableRow]中说“Y” . 如何以最简单的方式实现这一目标?

我尝试从“Y”中提取表和模式,并将其保存在静态全局变量(分别为tableName和schema)中 . 但奇怪的是,BigQueryIO.writeTableRows()总是将变量tableName的值变为null . 但它得到了架构 . 我尝试记录这些变量的值,我可以看到两者的值都存在 .

这是我的管道代码:

static String tableName;
static TableSchema schema;

PCollection<String> read = p.apply("Read from input file",
  TextIO.read().from(options.getInputFile()));

PCollection<TableRow> tableRows = p.apply(
  BigQueryIO.read().fromQuery(NestedValueProvider.of(
    options.getfilename(),
    new SerializableFunction<String, String>() {
         @Override
         public String apply(String filename) {
           return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'";
         }
    })).usingStandardSql().withoutValidation());

final PCollectionView<List<String>> dataView = read.apply(View.asList());

tableRows.apply("Convert data read from file to TableRow",
  ParDo.of(new DoFn<TableRow,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
      tableName = c.element().get("table").toString();
      String[] schemas = c.element().get("schema").toString().split(",");
      List<TableFieldSchema> fields = new ArrayList<>();
      for(int i=0;i<schemas.length;i++) {
        fields.add(new TableFieldSchema()
          .setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
      }
      schema = new TableSchema().setFields(fields);

      //My code to convert data to TableRow format.
    }}).withSideInputs(dataView)); 


tableRows.apply("write to BigQuery", 
  BigQueryIO.writeTableRows()
    .withSchema(schema)
    .to("ProjectID:DatasetID."+tableName)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

一切正常 . 只有BigQueryIO.write操作失败,我得到错误TableId为null .

我也尝试使用SerializableFunction并从那里返回值,但我仍然得到null .

这是我为它尝试的代码:

tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
  .withSchema(schema)
  .to(new GetTable(tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

public static class GetTable implements SerializableFunction<String,String> {
  String table;

  public GetTable() {
    this.table = tableName;
  }

  @Override
  public String apply(String arg0) {
    return "ProjectId:DatasetId."+table;
  }
}

我也尝试使用DynamicDestinations但是我收到错误,说没有提供架构 . 老实说,我是DynamicDestinations概念的新手,我不确定我是否正确地做到了 .

这是我为它尝试的代码:

tableRows2.apply(BigQueryIO.writeTableRows()
  .to(new DynamicDestinations<TableRow, TableRow>() {
    private static final long serialVersionUID = 1L;
    @Override
    public TableDestination getTable(TableRow dest) {
      List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema
      String table = list.get(0).get("table").toString();
      String tableSpec = "ProjectId:DatasetId."+table;
      String tableDescription = "";
      return new TableDestination(tableSpec, tableDescription);
    }

    public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
      return null;
    }

    @Override
    public TableSchema getSchema(TableRow destination) {
      return schema;   //schema is getting added from the global variable
    }
    @Override
    public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
      return null;
    }
}.getSideInputs(bqDataView)));

请让我知道我做错了什么,我应该走哪条路 .

谢谢 .

1 回答

  • 0

    您遇到麻烦的部分原因是由于管道执行的两个阶段 . 首先,管道在您的机器上构建 . 这是PTransforms的所有应用程序发生时 . 在第一个示例中,这是执行以下行的时间:

    BigQueryIO.writeTableRows()
      .withSchema(schema)
      .to("ProjectID:DatasetID."+tableName)
    

    然而,ParDo中的代码在管道执行时运行,并且在许多机器上执行 . 因此,以下代码的运行时间远晚于管道构造:

    @ProcessElement
    public void processElement(ProcessContext c) {
      tableName = c.element().get("table").toString();
      ...
      schema = new TableSchema().setFields(fields);
      ...
    }
    

    这意味着在创建BigQueryIO接收器时,不会设置tableName和schema字段 .

    您使用DynamicDestinations的想法是正确的,但您需要移动代码以实际生成该类的目标模式,而不是依赖于所有机器上都不可用的全局变量 .

相关问题