首页 文章

如何在Spark SQL的DataFrame中更改列类型?

提问于
浏览
119

假设我做的事情如下:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...

但我真的希望 yearInt (并且可能会转换其他一些列) .

我能想到的最好的是

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

这有点令人费解 .

我来自R,而且我习惯于写作,例如

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)

我可能会遗漏一些东西,因为在spark / scala中应该有更好的方法来做到这一点......

16 回答

  • 3

    从Spark 1.4版开始,您可以在列上应用带有DataType的强制转换方法:

    import org.apache.spark.sql.types.IntegerType
    val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
        .drop("year")
        .withColumnRenamed("yearTmp", "year")
    

    如果您使用的是sql表达式,您还可以:

    val df2 = df.selectExpr("cast(year as int) year", 
                            "make", 
                            "model", 
                            "comment", 
                            "blank")
    

    有关更多信息,请查看文档:http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

  • 54

    [编辑:2016年3月:感谢投票!虽然真的,这不是最好的答案,我认为msemelman,Martin Senne和其他人提出的基于 withColumnwithColumnRenamedcast 的解决方案更简单,更清洁 .

    我认为你的方法还可以,回想一下Spark DataFrame 是Rows的一个(不可变的)RDD,所以我们永远不会真正替换一个列,只是每次使用新模式创建新的 DataFrame .

    假设您有一个具有以下架构的原始df:

    scala> df.printSchema
    root
     |-- Year: string (nullable = true)
     |-- Month: string (nullable = true)
     |-- DayofMonth: string (nullable = true)
     |-- DayOfWeek: string (nullable = true)
     |-- DepDelay: string (nullable = true)
     |-- Distance: string (nullable = true)
     |-- CRSDepTime: string (nullable = true)
    

    并且在一列或多列上定义了一些UDF:

    import org.apache.spark.sql.functions._
    
    val toInt    = udf[Int, String]( _.toInt)
    val toDouble = udf[Double, String]( _.toDouble)
    val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
    val days_since_nearest_holidays = udf( 
      (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
     )
    

    更改列类型甚至从另一个构建新的DataFrame可以这样写:

    val featureDf = df
    .withColumn("departureDelay", toDouble(df("DepDelay")))
    .withColumn("departureHour",  toHour(df("CRSDepTime")))
    .withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
    .withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
    .withColumn("month",          toInt(df("Month")))              
    .withColumn("distance",       toDouble(df("Distance")))              
    .withColumn("nearestHoliday", days_since_nearest_holidays(
                  df("Year"), df("Month"), df("DayofMonth"))
                )              
    .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
            "month", "distance", "nearestHoliday")
    

    产量:

    scala> df.printSchema
    root
     |-- departureDelay: double (nullable = true)
     |-- departureHour: integer (nullable = true)
     |-- dayOfWeek: integer (nullable = true)
     |-- dayOfMonth: integer (nullable = true)
     |-- month: integer (nullable = true)
     |-- distance: double (nullable = true)
     |-- nearestHoliday: integer (nullable = true)
    

    这非常接近您自己的解决方案 . 简单地说,将类型更改和其他转换保持为单独的 udf val s使代码更具可读性和可重用性 .

  • 15

    由于 cast 操作可用于Spark Column (并且由于我个人不赞成@652832_此时提出的 udf ),如何:

    df.select( df("year").cast(IntegerType).as("year"), ... )
    

    转换为请求的类型?作为一个整洁的副作用,在这个意义上,不能施展的 Value 将变成 null .

    如果您需要 a helper method ,请使用:

    object DFHelper{
      def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
        df.withColumn( cn, df(cn).cast(tpe) )
      }
    }
    

    使用如下:

    import DFHelper._
    val df2 = castColumnTo( df, "year", IntegerType )
    
  • 30

    首先,如果你想要铸造类型

    import org.apache.spark.sql
    df.withColumn("year", $"year".cast(sql.types.IntegerType))
    

    使用相同的列名称,该列将替换为新列,您无需添加和删除 .

    第二,关于Scala vs R. Scala代码与R最相似,我可以实现:

    val df2 = df.select(
       df.columns.map {
         case year @ "year" => df(year).cast(IntegerType).as(year)
         case make @ "make" => functions.upper(df(make)).as(make)
         case other         => df(other)
       }: _*
    )
    

    虽然长度比R长一点 . 请注意, mutate 是R数据帧的一个函数,因此Scala在不使用特殊函数的情况下表现出的功率非常好 .

    df.columns 令人惊讶的是一个Array [String]而不是Array [Column],也许他们希望它看起来像Python pandas的数据帧 . )

  • -1

    您可以使用 selectExpr 使其更清洁:

    df.selectExpr("cast(year as int) as year", "upper(make) as make",
        "model", "comment", "blank")
    
  • 7

    要将年份从字符串转换为int,您可以将以下选项添加到csv阅读器:"inferSchema" - > "true",请参阅DataBricks documentation

  • 82

    用于将DataFrame的数据类型从String修改为Integer的Java代码

    df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
    

    它将简单地将现有(String数据类型)强制转换为Integer .

  • 112

    所以这只有在你将问题保存到像sqlserver这样的jdbc驱动程序时才能真正起作用,但它对于你将遇到语法和类型的错误非常有帮助 .

    import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
    import org.apache.spark.sql.jdbc.JdbcType
    val SQLServerDialect = new JdbcDialect {
      override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
    
      override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
        case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
        case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
        case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
        case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
        case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
        case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
        case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
        case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
        case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
        case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
        //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
        case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
        case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
      }
    }
    
    JdbcDialects.registerDialect(SQLServerDialect)
    
  • 4
    df.select($"long_col".cast(IntegerType).as("int_col"))
    
  • 3

    建议使用cast,FYI,spark 1.4.1中的cast方法的答案被打破了 .

    例如,当转换为bigint时,具有值为“8182175552014127960”的字符串列的数据帧具有值“8182175552014128100”

    df.show
    +-------------------+
    |                  a|
    +-------------------+
    |8182175552014127960|
    +-------------------+
    
        df.selectExpr("cast(a as bigint) a").show
    +-------------------+
    |                  a|
    +-------------------+
    |8182175552014128100|
    +-------------------+
    

    在找到这个bug之前我们不得不面对很多问题,因为我们在 生产环境 中有bigint列 .

  • 6

    生成包含五个值的简单数据集,并将 int 转换为 string 类型:

    val df = spark.range(5).select( col("id").cast("string") )
    
  • 8

    此方法将删除旧列并创建具有相同值和新数据类型的新列 . 创建DataFrame时的原始数据类型是: -

    root
     |-- id: integer (nullable = true)
     |-- flag1: string (nullable = true)
     |-- flag2: string (nullable = true)
     |-- name: string (nullable = true)
     |-- flag3: string (nullable = true)
    

    在此之后,我运行以下代码来更改数据类型: -

    df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
    df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
    

    在此之后我的结果出现了: -

    root
     |-- id: integer (nullable = true)
     |-- flag2: string (nullable = true)
     |-- name: string (nullable = true)
     |-- flag1: boolean (nullable = true)
     |-- flag3: boolean (nullable = true)
    
  • -1

    您可以使用以下代码 .

    df.withColumn("year", df("year").cast(IntegerType))
    

    year 列转换为 IntegerType 列 .

  • 0

    可以通过在spark sql中使用强制转换来更改列的数据类型 . 表名是表,它有两列只有column1和column2,column1数据类型要更改 . ex-spark.sql(“select cast(column1 as Double)column1NewName,column2 from table”)在double中写入你的数据类型 .

  • 2
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
        //Schema to be applied to the table
        val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)
    
        val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
    
  • 2

    其他方式:

    // Generate a simple dataset containing five values and convert int to string type
    
    val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
    

相关问题