我的用例很简单:从Pub / Sub订阅中读取事件日志,解析它们并保存到BigQuery中 . 因为事件的数量预计会显着增长,并且我使用无界数据源,所以我决定在BigQuery中配置分片:根据事件数据的时间戳将事件存储到日常表中(在Beam文档中称为"event time") . 我的问题是我需要在我的情况下配置窗口,或者我可以保留隐式使用全局窗口的默认配置吗?我没有使用 GroupByKey
和 Combine
之类的任何分组操作的原因,看起来我应该没有任何窗口配置就好了 . 或者我有没有理由使用窗口,例如它可能会影响 BigQueryIO
的表现如何?
我现在的分片方式如下 .
static class TableNamingFn implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> input) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
TableReference reference = new TableReference();
reference.setProjectId("test-project");
reference.setDatasetId("event_log");
DateTime timestamp = new DateTime(input.getValue().get("event_timestamp"), DateTimeZone.UTC);
reference.setTableId("events_" + formatter.print(timestamp));
return new TableDestination(reference, null);
}
}
// And then
eventRows.apply("BigQueryWrite", BigQueryIO.writeTableRows()
.to(new TableNamingFn())
.withSchema(EventSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
1 回答
看起来您正在尝试按日期对表进行分片,您是否考虑过使用Date-partitioned Table . 您可以使用分区装饰器更新设置表ID的位置,例如:
This article涵盖了使用BigQuery的分区表和Apache Beam . 特别是这段代码可能就是你想要使用的:https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition