首页 文章

如何向Spark DataFrame添加新列(使用PySpark)?

提问于
浏览
84

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列 .

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

6 回答

  • 133

    您无法在Spark中的 DataFrame 中添加任意列 . 只能使用文字创建新列(How to add a constant column in a Spark DataFrame?中描述了其他文字类型)

    from pyspark.sql.functions import lit
    
    df = sqlContext.createDataFrame(
        [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
    
    df_with_x4 = df.withColumn("x4", lit(0))
    df_with_x4.show()
    
    ## +---+---+-----+---+
    ## | x1| x2|   x3| x4|
    ## +---+---+-----+---+
    ## |  1|  a| 23.0|  0|
    ## |  3|  B|-23.0|  0|
    ## +---+---+-----+---+
    

    转换现有列:

    from pyspark.sql.functions import exp
    
    df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
    df_with_x5.show()
    
    ## +---+---+-----+---+--------------------+
    ## | x1| x2|   x3| x4|                  x5|
    ## +---+---+-----+---+--------------------+
    ## |  1|  a| 23.0|  0| 9.744803446248903E9|
    ## |  3|  B|-23.0|  0|1.026187963170189...|
    ## +---+---+-----+---+--------------------+
    

    包括使用 join

    from pyspark.sql.functions import exp
    
    lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    df_with_x6 = (df_with_x5
        .join(lookup, col("x1") == col("k"), "leftouter")
        .drop("k")
        .withColumnRenamed("v", "x6"))
    
    ## +---+---+-----+---+--------------------+----+
    ## | x1| x2|   x3| x4|                  x5|  x6|
    ## +---+---+-----+---+--------------------+----+
    ## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
    ## |  3|  B|-23.0|  0|1.026187963170189...|null|
    ## +---+---+-----+---+--------------------+----+
    

    或使用function / udf生成:

    from pyspark.sql.functions import rand
    
    df_with_x7 = df_with_x6.withColumn("x7", rand())
    df_with_x7.show()
    
    ## +---+---+-----+---+--------------------+----+-------------------+
    ## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
    ## +---+---+-----+---+--------------------+----+-------------------+
    ## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
    ## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
    ## +---+---+-----+---+--------------------+----+-------------------+
    

    性能方面的内置函数( pyspark.sql.functions )映射到Catalyst表达式,通常比Python用户定义的函数更受欢迎 .

    如果要将任意RDD的内容添加为列,则可以

  • -1

    对于Spark 2.0

    # assumes schema has 'age' column 
    df.select('*', (df.age + 10).alias('agePlusTen'))
    
  • 45
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *
    func_name = udf(
        lambda val: val, # do sth to val
        StringType()
    )
    df.withColumn('new_col', func_name(df.old_col))
    
  • 20

    我想为一个非常相似的用例提供一个通用的例子:

    使用案例:我有一个csv包括:

    First|Third|Fifth
    data|data|data
    data|data|data
    ...billion more lines
    

    我需要执行一些转换,最终的csv需要看起来像

    First|Second|Third|Fourth|Fifth
    data|null|data|null|data
    data|null|data|null|data
    ...billion more lines
    

    我需要这样做,因为这是由某些模型定义的模式,我需要我的最终数据可以与SQL批量插入和其他东西互操作 .

    所以:

    1)我使用spark.read读取原始csv并将其命名为“df” .

    2)我对数据做了些什么 .

    3)我使用这个脚本添加空列:

    outcols = []
    for column in MY_COLUMN_LIST:
        if column in df.columns:
            outcols.append(column)
        else:
            outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
    
    df = df.select(outcols)
    

    通过这种方式,您可以在加载csv后构建模式(如果必须对许多表执行此操作,也可以用于重新排序列) .

  • 0

    添加 column_name 时可以定义新的 udf

    u_f = F.udf(lambda :yourstring,StringType())
    a.select(u_f().alias('column_name')
    
  • -1

    要使用UDF添加列:

    df = sqlContext.createDataFrame(
        [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *
    
    def valueToCategory(value):
       if   value == 1: return 'cat1'
       elif value == 2: return 'cat2'
       ...
       else: return 'n/a'
    
    # NOTE: it seems that calls to udf() must be after SparkContext() is called
    udfValueToCategory = udf(valueToCategory, StringType())
    df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
    df_with_cat.show()
    
    ## +---+---+-----+---------+
    ## | x1| x2|   x3| category|
    ## +---+---+-----+---------+
    ## |  1|  a| 23.0|     cat1|
    ## |  3|  B|-23.0|      n/a|
    ## +---+---+-----+---------+
    

相关问题