首页 文章

如何跨多个数据集和日期分区从Dataproc写入BigQuery?

提问于
浏览
0

我们有一个每日Dataproc流程,可以代表我们的客户从多个来源导入数据进行分析 . 目前,我们每天都没有收到大量数据,但预计会大幅增加 . 我们当前的流程有四个Dataproc Spark作业,可以在最终作业中导入,解析,加入和输出到Cloud SQL,在每个作业之间编写临时Avro文件 . 即使使用我们当前的数据级别,Cloud SQL也开始变得紧张(部分原因是由于一个公认的糟糕模式) . 我们想要转向BigQuery,所以我的第一个工作就是让第五个工作读取最终的Avro文件并输出到BigQuery,基本上与当前的Cloud SQL输出工作并行 .

使用Using the BigQuery Connector with Spark中的示例我已经解决了如何执行此操作,但需要更多复杂性 . 具体来说,我需要:

  • 将单个客户数据(多个客户的数据可以从单一来源到达)分离到单个数据集中

  • 根据"DateOfService"字段按天分区数据

我认为这样做的唯一方法是由customer和DateOfService创建单独的RDD,并将它们分别写入适当的数据集和表分区 . 我对此的担心是,对于单个RDD来说,写作似乎需要花费很长时间(几分钟),如果我必须编写几个单独的RDD,它可能会变得令人望而却步 .

到目前为止,这是我的代码的重要部分 . 其余的只是与Google的示例相同的所有配置,除了我的表架构和项目的值 .

// Read the processed data from Avro
val claimsRdd = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]]("gs://path/to/avro/file")

// Convert from a RDD[Row] to RDD[String]. Conveniently these are JSON strings.
val claimsJson = claimsRdd.map(l => new String(l._1.datum.toString()))

// Convert into a RDD[(Null, JsonObject)]
val claimsJsonObj = claimsJson.map(s => (null, (new JsonParser).parse(s).getAsJsonObject))

// Write to BigQuery
claimsJsonObj.saveAsNewAPIHadoopDataset(conf)

我的问题是:

  • 这是多重RDD方法我描述了这样做的唯一或最佳方法吗?

  • 有没有更快的方法从Dataproc写入BigQuery?或者Dataflow在这方面更快?我可能能够重写到Dataflow,但我有一个用Scala编写的解析器,我必须重写为Java,我们利用Spark的SQL功能,并在Dataflow中解决如何做到这一点似乎有点令人生畏 . 但如果这是更好的方式我会考虑它 .

1 回答

  • 1

    Batch Dataflow目前不支持写入动态的BigQuery表集,因此您需要事先了解客户和日期 . 但是,如果是这种情况,Dataflow将有效地处理此问题并并行执行上载 .

    此外,可以直接在Dataflow管道中使用Scala代码;只需使用Scala程序中的Dataflow API即可 .

    我对Spark不太了解,所以无法评论最好的处理方法 .

相关问题