### load Data and check records
raw_df = spark.table("test.original")
raw_df.count()
lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
### Check data in few partitions.
sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
print "Number of records: ", sample.count()
sample.show()
### Back-up the partitions before deletion
raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
### UDF : To delete particular partition.
def delete_part(table, part):
qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
spark.sql(qry)
### Delete partitions
part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
part_list = part_df.rdd.map(lambda x : x[0]).collect()
table = "test.original"
for p in part_list:
delete_part(table, p)
### Do the required Changes to the columns in partitions
df = spark.table("test.original_bkp")
newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
### Write the Partitions back to Original table
newdf.write.insertInto("test.original")
### Verify data in Original table
orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
Hope it helps.
Regards,
Neeraj
val sparkSession: SparkSession = SparkSession
.builder
.enableHiveSupport()
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
.appName("spark_write_to_dynamic_partition_folders")
这里的用法:
DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")
12 回答
在 insertInto 语句中添加“ overwrite=True”参数可解决此问题:
默认情况下
overwrite=False
。将其更改为True
可以覆盖df
和 partioned_table 中包含的特定分区。这有助于我们避免用df
覆盖 partioned_table 的全部内容。对于> = Spark 2.3.0:
这是一个普遍的问题。 Spark 最高 2.0 的唯一解决方案是直接写入分区目录 e.g. ,
如果您在 2.0 之前使用 Spark,则需要使用以下命令阻止 Spark 发出元数据文件(因为它们会破坏自动分区发现):
如果您在 1.6.2 之前使用 Spark,则还需要删除
/root/path/to/data/partition_col=value
中的_SUCCESS
文件,否则它的存在将破坏自动分区发现。 (我强烈建议使用 1.6.2 或 later.)您可以在防弹工作上的 Spark Summit 演讲中获得有关如何管理大型分区表的更多详细信息。
最后!现在,这是 Spark 2.3.0 中的功能:https://issues.apache.org/jira/browse/SPARK-20236
要使用它,您需要将spark.sql.sources.partitionOverwriteMode设置为动态,需要对数据集进行分区,并且写入模式overwrite。例:
我建议在写入之前根据您的分区列进行重新分区,因此每个文件夹最终不会包含 400 个文件。
在 Spark 2.3.0 之前,最好的解决方案是启动 SQL 语句以删除这些分区,然后使用模式追加将其写入。
使用 Spark 1.6...
HiveContext 可以大大简化此过程。关键在于您必须首先使用
CREATE EXTERNAL TABLE
语句(已定义分区)在 Hive 中创建表。例如:从这里,假设您有一个数据框,其中有一个特定分区(或多个分区)的新记录。您可以使用 HiveContext SQL 语句使用此 Dataframe 执行
INSERT OVERWRITE
,它将仅覆盖 Dataframe 中包含的分区的表:注意:此示例中的
update_dataframe
具有与目标test
表的模式匹配的模式。使用此方法容易犯的一个错误是跳过 Hive 中的
CREATE EXTERNAL TABLE
步骤,而仅使用 Dataframe API 的 write 方法创建表。特别是对于 Parquet-based 表,将无法正确定义该表以支持 Hive 的INSERT OVERWRITE... PARTITION
函数。希望这可以帮助。
我尝试了以下方法来覆盖 HIVE 表中的特定分区。
如果使用 DataFrame,则可能要对数据使用 Hive 表。在这种情况下,您只需要调用方法
它将覆盖 DataFrame 包含的分区。
无需指定格式(orc),因为 Spark 将使用 Hive 表格式。
在 Spark 版本 1.6 中运行正常
我建议您创建一个类似于目标表的临时表,然后在其中插入数据,而不是直接写入目标表。
建立表格后,您便会将资料写入
tmpLocation
然后,您将通过执行以下操作恢复表分区路径:
通过查询 Hive 元数据来获取分区路径,例如:
从
trgtTbl
删除这些分区,并将目录从tmpTbl
移到trgtTbl
作为 jatin Wrote,您可以从配置单元和路径中删除分区,然后追加数据。由于我浪费了太多时间,因此为其他 spark 用户添加了以下示例。我将 Scala 与 Spark 2.2.1 一起使用
}
您可以执行以下操作使工作可重新进入(幂等):(在 spark 2.2 上进行了尝试)
我建议您做 clean-up,然后以
Append
模式编写新分区:这将仅删除新分区。写入数据后,如果需要更新元存储,请运行以下命令:
注意:
deletePath
假设hfds
命令在您的系统上可用。使用 Scala 在 Spark 2.3.1 上对此进行了测试。上面的大多数答案都写入 Hive 表。但是,我想直接写到disk,它在此文件夹的顶部带有
external hive table
。首先需要配置
这里的用法: