首页 文章

在Beam管道中以编程方式生成BigQuery模式

提问于
浏览
2

我有一个同类dicts的集合,如何在不知道架构的情况下将它们写入BigQuery?

BigQuerySink要求我在构造架构时指定架构 . 但是,我不知道架构:它是由我正在尝试编写的词组的键来定义的 .

有没有办法让我的管道推断出架构,然后将它(作为侧面输入?)提供给接收器?

例如:

# Create a PCollection of dicts, something like
# {'field1': 'myval', 'field2': 10}
data = (p | 'generate_data' >> beam.ParDo(CreateData())

# Infer the schema from the data
# Generates a string for each element (ok to assume all dict keys equal)
# "field1:STRING, field2:INTEGER"
schema = (data
  | 'infer_schema' >> beam.ParDo(InferSchema())
  | 'sample_one'   >> beam.combiners.Sample.FixedSizeGlobally(1))

但是,如何将模式作为参数提供给BigQuerySink,并在beam.io.Write中使用它?

我知道这不正确,但我想做的是:

sink = BigQuerySink(tablename, dataset, project, schema=Materialize(schema))
p | 'write_bigquery' >> beam.io.Write(sink)

tl; dr有没有办法从apache beam创建和编写一个bigquery表,以编程方式从数据中推断出架构?

2 回答

  • 0

    假设您的架构可以经常更改,您可以更好地将数据保持为更通用的形式 .

    例如,您的行可能包含单个重复记录(您的字典条目) .

    记录模式如下所示:key(STRING)|可选的string_val(STRING)|可选的int_val(INTEGER)可选的double_val(DOUBLE)|可选的boolean_val(BOOLEAN)| ...

    然后,您可以编写按类型扫描记录的查询 . 这样效率稍差(因为如果它们位于不同的列中,您将不得不扫描可能跳过的行),但完全避免预先指定您的架构 .

  • 0

    现在,我提出的最佳解决方案是显式地硬编码dict键到BigQuery架构的映射 . 两个好处 - 它可以解决必须指定的模式问题,它允许我从BigQuery中不需要的字典中过滤掉元素 .

    SCHEMA = {
      'field1': 'INTEGER',
      'field2': 'STRING',
      ...
    }
    schema_str = ','.join(['%s:%s' % (k, v) for k,v in SCHEMA.iteritems()])
    
    sink = BigQuerySink(tablename,
            dataset=dataset,
            project=project,
            schema=schema_str,
            write_disposition=BigQueryDisposition.WRITE_TRUNCATE)
    
    (pipeline
      # filters just the keys of each dict to the keys of SCHEMA.
      | 'filter_fields' >> beam.ParDo(FilterFieldKeysDoFn(SCHEMA))
      | 'to_bigquery' >> beam.io.Write(sink))
    

相关问题