首页 文章

如何定义DataFrame的分区?

提问于
浏览
103

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames . 我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作 .

我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例 .

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

至少在最初,大多数计算将发生在帐户内的交易之间 . 所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中 .

但我没有看到定义这个的方法 . DataFrame类有一个名为'repartition(Int)'的方法,您可以在其中指定要创建的分区数 . 但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定 .

源数据存储在Parquet中 . 我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过“帐户”列对其数据进行分区 . 但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,这听起来不是一个合理的解决方案 .

有没有办法让Spark对此DataFrame进行分区,以便帐户的所有数据都在同一个分区中?

5 回答

  • 6

    Spark> = 2.3.0

    SPARK-22614公开范围分区 .

    val partitionedByRange = df.repartitionByRange(42, $"k")
    
    partitionedByRange.explain
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k ASC NULLS FIRST], 42
    // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
    // 
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- Project [_1#2 AS k#5, _2#3 AS v#6]
    //    +- LocalRelation [_1#2, _2#3]
    // 
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- LocalRelation [k#5, v#6]
    // 
    // == Physical Plan ==
    // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
    // +- LocalTableScan [k#5, v#6]
    

    SPARK-22389Data Source API v2中公开外部格式分区 .

    Spark> = 1.6.0

    在Spark> = 1.6中,可以按列使用分区进行查询和缓存 . 请参阅:SPARK-11410SPARK-4849使用 repartition 方法:

    val df = Seq(
      ("A", 1), ("B", 2), ("A", 3), ("C", 1)
    ).toDF("k", "v")
    
    val partitioned = df.repartition($"k")
    partitioned.explain
    
    // scala> df.repartition($"k").explain(true)
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Physical Plan ==
    // TungstenExchange hashpartitioning(k#7,200), None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- Scan PhysicalRDD[_1#5,_2#6]
    

    RDDs 不同,Spark Dataset (包括 Dataset[Row] a.k.a DataFrame )目前无法使用自定义分区程序 . 您通常可以通过创建人工分区列来解决这个问题,但它不会给您相同的灵活性 .

    Spark <1.6.0:

    您可以做的一件事是在创建_545206之前预分区输入数据

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.HashPartitioner
    
    val schema = StructType(Seq(
      StructField("x", StringType, false),
      StructField("y", LongType, false),
      StructField("z", DoubleType, false)
    ))
    
    val rdd = sc.parallelize(Seq(
      Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
      Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
    ))
    
    val partitioner = new HashPartitioner(5) 
    
    val partitioned = rdd.map(r => (r.getString(0), r))
      .partitionBy(partitioner)
      .values
    
    val df = sqlContext.createDataFrame(partitioned, schema)
    

    由于 DataFrameRDD 创建只需要一个简单的 Map 阶段,现有的分区布局应该保留*:

    assert(df.rdd.partitions == partitioned.partitions)
    

    您可以使用相同的方式重新分配现有的 DataFrame

    sqlContext.createDataFrame(
      df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
      df.schema
    )
    

    所以看起来并非不可能 . 问题仍然存在,如果它有意义的话 . 我认为大多数时候它不会:

    • 重新分区是一个昂贵的过程 . 在典型情况下,大多数数据必须被序列化,洗牌和反序列化 . 另一方面,可以从预分区数据中受益的操作数量相对较小,并且如果内部API不是为了利用该属性而进一步限制 .

    • 在某些情况下加入,但需要内部支持,

    • 窗口函数调用匹配的分区程序 . 与上面相同,仅限于单个窗口定义 . 它已经在内部进行了分区,因此预分区可能是多余的,
      使用 GROUP BY 进行

    • 简单聚合 - 可以减少临时缓冲区**的内存占用量,但总体成本要高得多 . 或多或少相当于 groupByKey.mapValues(_.reduce) (当前行为)vs reduceByKey (预分区) . 不太可能在实践中有用 .
      使用 SqlContext.cacheTable 进行

    • 数据压缩 . 由于它看起来像是使用行程编码,因此应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions 可以提高压缩率 .

    • 性能高度依赖于密钥的分布 . 如果它是偏斜的,将导致次优的资源利用率 . 在最糟糕的情况下,根本不可能完成这项工作 .

    • 使用高级声明性API的一个重点是将自己与低级实现细节隔离开来 . 正如@dwysakowicz@RomiKuntsman已经提到的那样,优化是Catalyst Optimizer的工作 . 这是一个非常复杂的野兽,我真的怀疑你可以很容易地改进它,而不会深入到其内部 .

    相关概念

    Partitioning with JDBC sources

    JDBC数据源支持predicates argument . 它可以使用如下:

    sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
    

    它为每个谓词创建一个JDBC分区 . 请记住,如果使用单个谓词创建的集合不是不相交的,那么您将在结果表中看到重复项 .

    partitionBy method in DataFrameWriter

    Spark DataFrameWriter 提供 partitionBy 方法,可用于写入"partition"数据 . 它使用提供的列集分隔写入数据

    val df = Seq(
      ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
    ).toDF("k", "v")
    
    df.write.partitionBy("k").json("/tmp/foo.json")
    

    这样可以根据键启用谓词下推读取查询:

    val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
    df1.where($"k" === "bar")
    

    但它不等于 DataFrame.repartition . 特别是聚合:

    val cnts = df1.groupBy($"k").sum()
    

    仍然需要 TungstenExchange

    cnts.explain
    
    // == Physical Plan ==
    // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
    // +- TungstenExchange hashpartitioning(k#90,200), None
    //    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
    //       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
    

    bucketBy method in DataFrameWriter (Spark> = 2.0):

    bucketBy 具有与 partitionBy 类似的应用程序,但它仅适用于表( saveAsTable ) . Bucketing信息可用于优化连接:

    // Temporarily disable broadcast joins
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df.write.bucketBy(42, "k").saveAsTable("df1")
    val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
    df2.write.bucketBy(42, "k").saveAsTable("df2")
    
    // == Physical Plan ==
    // *Project [k#41, v#42, v2#47]
    // +- *SortMergeJoin [k#41], [k#46], Inner
    //    :- *Sort [k#41 ASC NULLS FIRST], false, 0
    //    :  +- *Project [k#41, v#42]
    //    :     +- *Filter isnotnull(k#41)
    //    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
    //    +- *Sort [k#46 ASC NULLS FIRST], false, 0
    //       +- *Project [k#46, v2#47]
    //          +- *Filter isnotnull(k#46)
    //             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
    

    *通过分区布局我的意思是只有数据分布 . partitioned RDD不再是分区程序 . **假设没有早期预测 . 如果聚合只涵盖小列的子集可能没有任何收益 .

  • 4

    在Spark <1.6中如果创建 HiveContext ,而不是普通的旧 SqlContext ,则可以使用HiveQL DISTRIBUTE BY colX... (确保每个N减少器获得x的非重叠范围)和 CLUSTER BY colX... (分配依据和排序依据的快捷方式);

    df.registerTempTable("partitionMe")
    hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
    

    不确定这如何适用于Spark DF api . 普通的SqlContext不支持这些关键字(请注意,您不需要使用Hive元文件来使用HiveContext)

    编辑:Spark 1.6现在在本机DataFrame API中有这个

  • 2

    所以从某种答案开始:) - 你做不到

    我不是专家,但据我了解DataFrames,它们不等于rdd,而DataFrame没有Partitioner这样的东西 .

    通常,DataFrame的想法是提供另一个抽象级别来处理这些问题本身 . DataFrame上的查询被转换为逻辑计划,进一步转换为RDD上的操作 . 您建议的分区可能会自动应用,或者至少应该应用 .

    如果你不相信SparkSQL会提供某种最佳工作,你可以按照注释中的建议将DataFrame转换为RDD [Row] .

  • 148

    使用返回的DataFrame:

    yourDF.orderBy(account)
    

    没有明确的方法在DataFrame上使用partitionBy,仅在PairRDD上,但是当您对DataFrame进行排序时,它将在其LogicalPlan中使用它,这将有助于您在每个帐户上进行计算时 .

    我只是偶然发现了同样的问题,我希望按帐户划分数据帧 . 我假设当你说“想要对数据进行分区以便一个帐户的所有事务都在同一个Spark分区中”时,你想要它的规模和性能,但你的代码并不依赖它(比如使用mapPartitions()等),对吧?

  • 8

    我能够使用RDD做到这一点 . 但我不知道这是否适合您 . 将DF作为RDD提供后,可以应用repartitionAndSortWithinPartitions来执行数据的自定义重新分区 .

    这是我使用的示例:

    class DatePartitioner(partitions: Int) extends Partitioner {
    
      override def getPartition(key: Any): Int = {
        val start_time: Long = key.asInstanceOf[Long]
        Objects.hash(Array(start_time)) % partitions
      }
    
      override def numPartitions: Int = partitions
    }
    
    myRDD
      .repartitionAndSortWithinPartitions(new DatePartitioner(24))
      .map { v => v._2 }
      .toDF()
      .write.mode(SaveMode.Overwrite)
    

相关问题