首页 文章

将Spark数据帧保存为Hive中的动态分区表

提问于
浏览
20

我有一个示例应用程序正在从csv文件读取数据帧 . 可以使用方法 df.saveAsTable(tablename,mode) 将数据帧以镶木地板格式存储到Hive表中 .

上面的代码工作正常,但我每天都有如此多的数据,我想根据creationdate(表中的列)动态分区hive表 .

有没有办法动态分区数据帧并将其存储到配置单元仓库 . 想要避免使用 hivesqlcontext.sql(insert into table partittioin by(date)....) 对insert语句进行硬编码 .

问题可视为以下内容的延伸:How to save DataFrame directly to Hive?

任何帮助深表感谢 .

5 回答

  • 26

    我相信它的工作原理如下:

    df 是包含年,月和其他列的数据框

    df.write.partitionBy('year', 'month').saveAsTable(...)
    

    要么

    df.write.partitionBy('year', 'month').insertInto(...)
    
  • 19

    我能够使用 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table") 写入分区的配置单元表

    我必须启用以下属性才能使其正常工作 .

    hiveContext.setConf("hive.exec.dynamic.partition", "true")
    hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    
  • 6

    我也面对同样的事情,但使用了我解决的以下技巧 .

    • 当我们将任何表分区为分区时,分区列将区分大小写 .

    • 分区列应存在于具有相同名称的DataFrame中(区分大小写) . 码:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    
  • 0

    这对我有用 . 我设置了这些设置,然后将数据放入分区表中 .

    from pyspark.sql import HiveContext
    sqlContext = HiveContext(sc)
    sqlContext.setConf("hive.exec.dynamic.partition", "true")
    sqlContext.setConf("hive.exec.dynamic.partition.mode", 
    "nonstrict")
    
  • 0

    这对我来说使用python和spark 2.1.0 .

    不确定这是否是最好的方法,但它的工作原理......

    # WRITE DATA INTO A HIVE TABLE
    import pyspark
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("hive.exec.dynamic.partition", "true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .enableHiveSupport() \
        .getOrCreate()
    
    ### CREATE HIVE TABLE (with one row)
    spark.sql("""
    CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
    USING HIVE OPTIONS(fileFormat 'PARQUET')
    PARTITIONED BY (partition_bin)
    LOCATION 'hive_df'
    """)
    spark.sql("""
    INSERT INTO hive_df PARTITION (partition_bin = 0)
    VALUES (0, 'init_record')
    """)
    ###
    
    ### CREATE NON HIVE TABLE (with one row)
    spark.sql("""
    CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
    USING PARQUET
    PARTITIONED BY (partition_bin)
    LOCATION 'non_hive_df'
    """)
    spark.sql("""
    INSERT INTO non_hive_df PARTITION (partition_bin = 0)
    VALUES (0, 'init_record')
    """)
    ###
    
    ### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
    spark.sql("""
    INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
    VALUES (0, 'new_record', 1)
    """)
    spark.sql("""
    INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
    VALUES (0, 'new_record', 1)
    """)
    
    spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
    spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
    

相关问题