首页 文章

在 Spark DataFrame 写入方法中覆盖特定分区

提问于
浏览
46

我想覆盖特定的分区,而不是全部覆盖。我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

其中 df 是具有要覆盖的增量数据的数据帧。

hdfs-base-path 包含主数据。

当我尝试上述命令时,它将删除所有分区,并在 hdfs 路径中的 df 中插入这些分区。

我的要求是只覆盖指定 hdfs 路径中 df 中存在的那些分区。有人可以帮我吗?

12 回答

  • 0

    在 insertInto 语句中添加“ overwrite=True”参数可解决此问题:

    hiveContext.setConf("hive.exec.dynamic.partition", "true")
    hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    
    df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
    

    默认情况下overwrite=False。将其更改为True可以覆盖df和 partioned_table 中包含的特定分区。这有助于我们避免用df覆盖 partioned_table 的全部内容。

  • 0

    对于> = Spark 2.3.0:

    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    data.write.insertInto("partitioned_table", overwrite=True)
    
  • 41

    这是一个普遍的问题。 Spark 最高 2.0 的唯一解决方案是直接写入分区目录 e.g. ,

    df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
    

    如果您在 2.0 之前使用 Spark,则需要使用以下命令阻止 Spark 发出元数据文件(因为它们会破坏自动分区发现):

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
    

    如果您在 1.6.2 之前使用 Spark,则还需要删除/root/path/to/data/partition_col=value中的_SUCCESS文件,否则它的存在将破坏自动分区发现。 (我强烈建议使用 1.6.2 或 later.)

    您可以在防弹工作上的 Spark Summit 演讲中获得有关如何管理大型分区表的更多详细信息。

  • 65

    最后!现在,这是 Spark 2.3.0 中的功能:https://issues.apache.org/jira/browse/SPARK-20236

    要使用它,您需要将spark.sql.sources.partitionOverwriteMode设置为动态,需要对数据集进行分区,并且写入模式overwrite。例:

    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    data.write.mode("overwrite").insertInto("partitioned_table")
    

    我建议在写入之前根据您的分区列进行重新分区,因此每个文件夹最终不会包含 400 个文件。

    在 Spark 2.3.0 之前,最好的解决方案是启动 SQL 语句以删除这些分区,然后使用模式追加将其写入。

  • 6

    使用 Spark 1.6...

    HiveContext 可以大大简化此过程。关键在于您必须首先使用CREATE EXTERNAL TABLE语句(已定义分区)在 Hive 中创建表。例如:

    # Hive SQL
    CREATE EXTERNAL TABLE test
    (name STRING)
    PARTITIONED BY
    (age INT)
    STORED AS PARQUET
    LOCATION 'hdfs:///tmp/tables/test'
    

    从这里,假设您有一个数据框,其中有一个特定分区(或多个分区)的新记录。您可以使用 HiveContext SQL 语句使用此 Dataframe 执行INSERT OVERWRITE,它将仅覆盖 Dataframe 中包含的分区的表:

    # PySpark
    hiveContext = HiveContext(sc)
    update_dataframe.registerTempTable('update_dataframe')
    
    hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                       SELECT name, age
                       FROM update_dataframe""")
    

    注意:此示例中的update_dataframe具有与目标test表的模式匹配的模式。

    使用此方法容易犯的一个错误是跳过 Hive 中的CREATE EXTERNAL TABLE步骤,而仅使用 Dataframe API 的 write 方法创建表。特别是对于 Parquet-based 表,将无法正确定义该表以支持 Hive 的INSERT OVERWRITE... PARTITION函数。

    希望这可以帮助。

  • 2

    我尝试了以下方法来覆盖 HIVE 表中的特定分区。

    ### load Data and check records
        raw_df = spark.table("test.original")
        raw_df.count()
    
    lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
    
    ### Check data in few partitions.
        sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
        print "Number of records: ", sample.count()
        sample.show()
    
    ### Back-up the partitions before deletion
        raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
    
    ### UDF : To delete particular partition.
        def delete_part(table, part):
            qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
            spark.sql(qry)
    
    ### Delete partitions
        part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
        part_list = part_df.rdd.map(lambda x : x[0]).collect()
    
        table = "test.original"
        for p in part_list:
            delete_part(table, p)
    
    ### Do the required Changes to the columns in partitions
        df = spark.table("test.original_bkp")
        newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
        newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
    
    ### Write the Partitions back to Original table
        newdf.write.insertInto("test.original")
    
    ### Verify data in Original table
        orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
    
    Hope it helps.
    
    Regards,
    
    Neeraj
    
  • 1

    如果使用 DataFrame,则可能要对数据使用 Hive 表。在这种情况下,您只需要调用方法

    df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)
    

    它将覆盖 DataFrame 包含的分区。

    无需指定格式(orc),因为 Spark 将使用 Hive 表格式。

    在 Spark 版本 1.6 中运行正常

  • 1

    我建议您创建一个类似于目标表的临时表,然后在其中插入数据,而不是直接写入目标表。

    CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';
    

    建立表格后,您便会将资料写入tmpLocation

    df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)
    

    然后,您将通过执行以下操作恢复表分区路径:

    MSCK REPAIR TABLE tmpTbl;
    

    通过查询 Hive 元数据来获取分区路径,例如:

    SHOW PARTITONS tmpTbl;
    

    trgtTbl删除这些分区,并将目录从tmpTbl移到trgtTbl

  • 1

    作为 jatin Wrote,您可以从配置单元和路径中删除分区,然后追加数据。由于我浪费了太多时间,因此为其他 spark 用户添加了以下示例。我将 Scala 与 Spark 2.2.1 一起使用

    import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.fs.Path
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
    
      case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int)
    
     object StackOverflowExample extends App {
    //Prepare spark & Data
    val sparkConf = new SparkConf()
    sparkConf.setMaster(s"local[2]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val tableName = "my_table"
    
    val partitions1 = List(1, 2)
    val partitions2 = List("e1", "e2")
    val partitionColumns = List("partition1", "partition2")
    val myTablePath = "/tmp/some_example"
    
    val someText = List("text1", "text2")
    val ids = (0 until 5).toList
    
    val listData = partitions1.flatMap(p1 => {
      partitions2.flatMap(p2 => {
        someText.flatMap(
          text => {
            ids.map(
              id => DataExample(p1, p2, text, id)
            )
          }
        )
      }
      )
    })
    
    val asDataFrame = spark.createDataFrame(listData)
    
    //Delete path function
    def deletePath(path: String, recursive: Boolean): Unit = {
      val p = new Path(path)
      val fs = p.getFileSystem(new Configuration())
      fs.delete(p, recursive)
    }
    
    def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = {
      if (spark.catalog.tableExists(tableName)) {
        //clean partitions
        val asColumns = partitions.map(c => new Column(c))
        val relevantPartitions = df.select(asColumns: _*).distinct().collect()
        val partitionToRemove = relevantPartitions.map(row => {
          val fields = row.schema.fields
          s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " +
            s"${fields.map(field => s"${field.name}='${row.getAs(field.name)}'").mkString("(", ",", ")")} PURGE"
        })
    
        val cleanFolders = relevantPartitions.map(partition => {
          val fields = partition.schema.fields
          path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/")
        })
    
        println(s"Going to clean ${partitionToRemove.size} partitions")
        partitionToRemove.foreach(partition => spark.sqlContext.sql(partition))
        cleanFolders.foreach(partition => deletePath(partition, true))
      }
      asDataFrame.write
        .options(Map("path" -> myTablePath))
        .mode(SaveMode.Append)
        .partitionBy(partitionColumns: _*)
        .saveAsTable(tableName)
    }
    
    //Now test
    tableOverwrite(asDataFrame, partitionColumns, tableName)
    spark.sqlContext.sql(s"select * from $tableName").show(1000)
    tableOverwrite(asDataFrame, partitionColumns, tableName)
    
    import spark.implicits._
    
    val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet
    if (asLocalSet == listData.toSet) {
      println("Overwrite is working !!!")
    }
    

    }

  • 0

    您可以执行以下操作使工作可重新进入(幂等):(在 spark 2.2 上进行了尝试)

    # drop the partition
    drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
    print drop_query
    spark.sql(drop_query)
    
    # delete directory
    dbutils.fs.rm(<partition_directoy>,recurse=True)
    
    # Load the partition
    df.write\
      .partitionBy("partition_col")\
      .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
    
  • 0

    我建议您做 clean-up,然后以Append模式编写新分区:

    import scala.sys.process._
    def deletePath(path: String): Unit = {
        s"hdfs dfs -rm -r -skipTrash $path".!
    }
    
    df.select(partitionColumn).distinct.collect().foreach(p => {
        val partition = p.getAs[String](partitionColumn)
        deletePath(s"$path/$partitionColumn=$partition")
    })
    
    df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)
    

    这将仅删除新分区。写入数据后,如果需要更新元存储,请运行以下命令:

    sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")
    

    注意: deletePath假设hfds命令在您的系统上可用。

  • 0

    使用 Scala 在 Spark 2.3.1 上对此进行了测试。上面的大多数答案都写入 Hive 表。但是,我想直接写到disk,它在此文件夹的顶部带有external hive table

    首先需要配置

    val sparkSession: SparkSession = SparkSession
          .builder
          .enableHiveSupport()
          .config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
          .appName("spark_write_to_dynamic_partition_folders")
    

    这里的用法:

    DataFrame
    .write
    .format("<required file format>")
    .partitionBy("<partitioned column name>")
    .mode(SaveMode.Overwrite) // This is required.
    .save(s"<path_to_root_folder>")
    

相关问题