首页 文章

如何将表行PCollections转换为Python中的键值PCollections?

提问于
浏览
3

没有关于如何将pCollections转换为输入到.CoGroupByKey()所需的pCollections的文档

上下文基本上我有两个大的pCollections,我需要能够找到两者之间的差异,对于第二类ETL更改(如果它在pColl1中不存在,那么添加到pColl2中的嵌套字段),这样我就能够从BigQuery保留这些记录的历史记录 .

管道架构:

  • 将BQ表读入2个pCollections:dwsku和product .

  • 将CoGroupByKey()应用于两组以返回 - >结果

  • 解析结果以查找dwsku中的所有更改并将其嵌套到产品中 .

建议任何帮助 . 我在SO上发现了一个java链接,它做了我需要完成的同样的事情(但是在Python SDK上没有任何内容) .

Convert from PCollection<TableRow> to PCollection<KV<K,V>>

是否有Apache Beam的文档/支持,特别是Python SDK?

1 回答

  • 5

    为了使 CoGroupByKey() 工作,你需要 PCollectionstuples ,其中第一个元素是 key ,第二个元素是 the data .

    在你的情况下,你说你有 BigQuerySource ,它在当前版本的Apache Beam输出 PCollection of dictionariescode),其中每个条目代表表中读取的一行 . 您需要将此PCollections映射到元组,如上所述 . 这很容易使用 ParDo

    class MapBigQueryRow(beam.DoFn):
        def process(self, element, key_column):
            key = element.get(key_column)
            yield key, element
    
    
    data1 = (p
                | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
                | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
    
    data2 = (p
                | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
                | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
    
    co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())
    
    # do your processing with co_grouped here
    

    顺便说一句,可以找到用于Apache Beam的Python SDK文档here .

相关问题