首页 文章

将RDD添加到DataFrame列PySpark

提问于
浏览
0

我想创建一个包含两个RDD列的Dataframe . 第一个是从CSV获得的RDD,第二个是具有每行的聚类预测的另一个RDD .

我的架构是:

customSchema = StructType([ \
StructField("Area", FloatType(), True), \
StructField("Perimeter", FloatType(), True), \
StructField("Compactness", FloatType(), True), \
StructField("Lenght", FloatType(), True), \
StructField("Width", FloatType(), True), \
StructField("Asymmetry", FloatType(), True), \
StructField("KernelGroove", FloatType(), True)])

映射我的rdd并创建DataFrame:

FN2 = rdd.map(lambda x: (float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]),float(x[6])))
 df = sqlContext.createDataFrame(FN2, customSchema)

我的集群预测:

result = Kmodel.predict(rdd)

因此,总而言之,我希望在我的DataFrame中包含我的CSV行和最后的群集预测 .

我试图用.WithColumn()添加一个新列,但我什么也没得到 .

谢谢 .

1 回答

  • 0

    如果两个数据框上都有公共字段,则使用密钥加入,否则创建唯一的Id并加入两个数据帧以在单个数据帧中获取CSV行及其簇预测

    Scala代码为每一行生成一个唯一的id,尝试转换为pyspark . 您需要生成增加的行ID并使用行ID连接它们

    import org.apache.spark.sql.types.{StructType, StructField, LongType}
    val df = sc.parallelize(Seq(("abc", 2), ("def", 1), ("hij", 3))).toDF("word", "count")
    val wcschema = df.schema
    val inputRows = df.rdd.zipWithUniqueId.map{
       case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
    val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields))
    

    或使用SQL查询

    val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount")
    tmpTable1.show()
    

相关问题