我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列 .
我试过以下但没有成功:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
使用这个也有错误:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?
6 回答
您无法在Spark中的
DataFrame
中添加任意列 . 只能使用文字创建新列(How to add a constant column in a Spark DataFrame?中描述了其他文字类型)转换现有列:
包括使用
join
:或使用function / udf生成:
性能方面的内置函数(
pyspark.sql.functions
)映射到Catalyst表达式,通常比Python用户定义的函数更受欢迎 .如果要将任意RDD的内容添加为列,则可以
添加row numbers to existing data frame
在RDD上调用
zipWithIndex
并将其转换为数据帧使用索引作为连接键加入
对于Spark 2.0
我想为一个非常相似的用例提供一个通用的例子:
使用案例:我有一个csv包括:
我需要执行一些转换,最终的csv需要看起来像
我需要这样做,因为这是由某些模型定义的模式,我需要我的最终数据可以与SQL批量插入和其他东西互操作 .
所以:
1)我使用spark.read读取原始csv并将其命名为“df” .
2)我对数据做了些什么 .
3)我使用这个脚本添加空列:
通过这种方式,您可以在加载csv后构建模式(如果必须对许多表执行此操作,也可以用于重新排序列) .
添加
column_name
时可以定义新的udf
:要使用UDF添加列: