首页 文章

使用Spark通过s3a将镶木地板文件写入s3非常慢

提问于
浏览
17

我正在尝试使用 Spark 1.6.1parquet 文件写入 Amazon S3 . 我正在生成的小 parquet~2GB 曾经写过's not that much data. I' m试图证明 Spark 作为我可以使用的平台 .

基本上我要做的是设置 star schemadataframes ,然后我'm going to write those tables out to parquet. The data comes in from csv files provided by a vendor and I' m使用Spark作为 ETL 平台 . 我目前在 ec2(r3.2xlarge) 中有一个3节点集群,所以 Actuator 上的内存为 120GB ,总共有16个内核 .

输入文件总共大约22GB,我现在提取大约2GB的数据 . 最后,当我开始加载完整数据集时,这将是几TB .

这是我的spark / scala pseudocode

def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

465884512行的计数大约需要2分钟 . 对拼花地板的写作需要 38 minutes

据我所知, coalesce 对写入的驱动程序进行了随机播放....但是它在做错事的时间量上做了很多 . 没有 coalesce ,这仍然需要15分钟,IMO仍然太长,并给了我一些小的 parquet 文件 . 我有'd like to have one large file per day of data that I' . 我有代码按字段值进行分区,而且速度也很慢 . 我也尝试将此输出到 csv ,这需要约1小时 .

另外,当我提交工作时,我并没有真正设置运行时道具 . 我的一项工作的控制台统计信息是:

  • 活着的 Worker :2

  • 正在使用的核心:16总计,16使用

  • 正在使用的内存:总计117.5 GB,使用107.5 GB

  • 应用程序:1运行,5完成

  • 驱动程序:0正在运行,0已完成

  • 状态:活着

2 回答

  • 4

    在I / O操作期间,Spark默认值会导致大量(可能)不必要的开销,尤其是在写入S3时 . This article对此进行了更全面的讨论,但有两种设置需要考虑更改 .

    • 使用DirectParquetOutputCommitter . 默认情况下,Spark会将所有数据保存到临时文件夹,然后再移动这些文件 . 使用DirectParquetOutputCommitter可以通过直接写入S3输出路径来节省时间

    • No longer available in Spark 2.0+

    • 如jira票中所述,目前的解决方案是

    将代码切换为使用s3a和Hadoop 2.7.2;它全面改善,在Hadoop 2.8中变得更好,并且是s3guard的基础使用Hadoop FileOutputCommitter并将mapreduce.fileoutputcommitter.algorithm.version设置为2

    -Schema合并默认情况下关闭Spark 1.5关闭模式合并 . 如果启用了架构合并,则驱动程序节点将扫描所有文件以确保一致的架构 . 这特别昂贵,因为它不是分布式操作 . 确保通过执行此操作来关闭此功能

    val file = sqx.read.option(“mergeSchema”,“false”) . parquet(path)

  • 18

    直接输出提交器已从火花代码库中消失;你要在自己的JAR中编写自己的/恢复已删除的代码 . 如果你这样做,在工作中关闭推测,并知道其他失败也会导致问题,问题是“无效数据” .

    更明亮的是,Hadoop 2.8将增加一些S3A加速,专门用于读取S3的优化二进制格式(ORC,Parquet);有关详细信息,请参阅HADOOP-11694 . 有些人正在使用Amazon Dynamo来实现一致的元数据存储,这应该能够在工作结束时执行强大的O(1)提交 .

相关问题