假设我有 PCollection<Foo> 并且我想将它写入多个BigQuery表,为每个 Foo 选择一个可能不同的表 .
PCollection<Foo>
Foo
如何使用Apache Beam BigQueryIO API执行此操作?
BigQueryIO
这可以使用最近添加到Apache Beam中的 BigQueryIO 的功能 .
PCollection<Foo> foos = ...; foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() { @Override public TableDestination apply(ValueInSingleWindow<Foo> value) { Foo foo = value.getValue(); // Also available: value.getWindow(), getTimestamp(), getPane() String tableSpec = ...; String tableDescription = ...; return new TableDestination(tableSpec, tableDescription); } }).withFormatFunction(new SerializableFunction<Foo, TableRow>() { @Override public TableRow apply(Foo foo) { return ...; } }).withSchema(...));
根据输入 PCollection<Foo> 是有界还是无界,这将导致创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者它将使用BigQuery流插入API .
最灵活的API版本使用 DynamicDestinations ,它允许您使用不同的模式将不同的值写入不同的表,甚至允许您在所有这些计算中使用来自管道其余部分的侧输入 .
DynamicDestinations
此外,BigQueryIO已被重构为许多可重用的转换,您可以自己组合以实现更复杂的用例 - 请参阅files in the source directory .
此功能将包含在Apache Beam的第一个稳定版本中,并将包含在Dataflow SDK的下一个版本中(将基于Apache Beam的第一个稳定版本) . 现在你可以通过在github上对HEAD的梁的快照运行你的管道来使用它 .
1 回答
这可以使用最近添加到Apache Beam中的
BigQueryIO
的功能 .根据输入
PCollection<Foo>
是有界还是无界,这将导致创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者它将使用BigQuery流插入API .最灵活的API版本使用
DynamicDestinations
,它允许您使用不同的模式将不同的值写入不同的表,甚至允许您在所有这些计算中使用来自管道其余部分的侧输入 .此外,BigQueryIO已被重构为许多可重用的转换,您可以自己组合以实现更复杂的用例 - 请参阅files in the source directory .
此功能将包含在Apache Beam的第一个稳定版本中,并将包含在Dataflow SDK的下一个版本中(将基于Apache Beam的第一个稳定版本) . 现在你可以通过在github上对HEAD的梁的快照运行你的管道来使用它 .