首页 文章

通过Dataflow将大型gzip JSON文件从Google Cloud Storage读入BigQuery

提问于
浏览
1

我试图从Google Cloud 端存储(GCS)中读取大约90个gz压缩日志文件,每个大约2GB大(未压缩10 GB),解析它们,并通过Google Cloud Dataflow将它们写入BigQuery(BQ)的日期分区表( GCDF) .

每个文件保存7天的数据,整个日期范围约为2年(730天和计数) . 我当前的管道如下所示:

p.apply("Read logfile", TextIO.Read.from(bucket))
 .apply("Repartition", Repartition.of())
 .apply("Parse JSON", ParDo.of(new JacksonDeserializer()))
 .apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps()))
 .apply("Format output to TableRow", ParDo.of(new TableRowConverter()))
 .apply("Window into partitions", Window.into(new TablePartWindowFun()))
 .apply("Write to BigQuery", BigQueryIO.Write
         .to(new DayPartitionFunc("someproject:somedataset", tableName))
         .withSchema(TableRowConverter.getSchema())
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

重新分区是我在尝试创建管道时所构建的内容reshuffle after decompressing,我尝试使用和不使用它来运行管道 . 解析JSON通过Jackon ObjectMapper和相应的类工作here . TablePartWindowFun取自here,用于为PCollection中的每个条目分配一个分区 .

管道适用于较小的文件而不是太多,但是对于我的真实数据集来说是中断 . 我选择了足够大的机器类型,并尝试设置最大数量的工作人员,以及使用自动调节最多100台n1-highmem-16机器 . 我已经尝试了流媒体和批处理模式以及每个工作者250到1200 GB的disSizeGb值 .

我现在能想到的可能解决方案是:

  • 解压缩GCS上的所有文件,从而启用工作程序之间的动态工作拆分,因为无法利用GCS的gzip transcoding

  • 在循环中构建"many"并行管道,每个管道仅处理90个文件的子集 .

在我看来,选项2就像编程“围绕”一个框架,还有另一种解决方案吗?

Addendum:

使用批处理模式读取gzip JSON文件后重新分区,最多100个工作者(类型为n1-highmem-4),管道运行大约一个小时,有12个工作人员,完成阅读以及重新分区的第一阶段 . 然后它可以扩展到100个工作人员并处理重新分区的PCollection . 完成后,图形如下所示:

Write to BQ Service Graph

有趣的是,当达到这个阶段时,首先它处理高达150万个元素/秒,然后进度下降到0.图片中GroupByKey步骤的OutputCollection的大小首先上升然后从大约3亿下降到0(总共约有18亿个元素 . 就像它丢弃了一些东西 . 此外, ExpandIterableParDo(Streaming Write) 结束时的运行时间为0.图片显示在运行"backwards"之前 . 在工作日志中,我看到一些 exception thrown while executing request 消息来自 com.google.api.client.http.HttpTransport Logger ,但我在Stackdriver中找不到更多信息 .

没有重新分区后读取管道失败使用 n1-highmem-2 实例与完全相同的步骤内的内存不足错误( GroupByKey 之后的所有内容) - 使用更大的实例类型会导致像

java.util.concurrent.ExecutionException: java.io.IOException: 
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s 
talking to frontendpipeline-..-harness-pc98:12346

2 回答

  • 1

    感谢Google Cloud 数据流团队的Dan和他提供的示例here,我能够解决这个问题 . 我做的唯一改变:

    • 在175 =(25周)大块的日子里循环,一个接一个地运行一个管道,以免压倒系统 . 在循环中,确保重新处理上一次迭代的最后一个文件,并以与底层数据相同的速度向前移动 startDate (175天) . 当使用 WriteDisposition.WRITE_TRUNCATE 时,以这种方式用正确的完整数据覆盖块结尾处的不完整日期 .

    • 在阅读gzip压缩文件后,使用上面提到的Repartition / Reshuffle转换,加快进程并允许更平滑的自动缩放

    • 使用DateTime而不是Instant类型,因为我的数据不是UTC格式

    更新(Apache Beam 2.0):

    随着Apache Beam 2.0的发布,解决方案变得更加容易 . 现在支持Sharding BigQuery输出表out of the box .

  • 0

    通过在运行管道时设置具有更高值的--numWorkers,尝试为管道分配更多资源可能是值得的 . 这是"Common Errors and Courses of Action"子章节中“在线排除故障”document中讨论的可能解决方案之一 .

相关问题