首页 文章

迭代保存到Pyspark中的新DataFrame

提问于
浏览
-1

我正在基于3种不同的PySpark DataFrame进行计算 .

这个脚本在它执行计算的意义上工作,但是,我努力与所述计算的结果一起正常工作 .

import sys
import numpy as np
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)

# Dummy Data
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5'])
df.show()
+---+---+---+---+---+
| p1| p2| p3| p4| p5|
+---+---+---+---+---+
|  0|  1|  0|  0|  0|
|  1|  1|  0|  0|  1|
|  0|  0|  1|  0|  1|
|  1|  0|  1|  1|  0|
|  1|  1|  0|  0|  0|
+---+---+---+---+---+

# Values
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index'))
values.show()
+----+----+-----+
|  f1|  f2|index|
+----+----+-----+
|   0|   1|   p1|
|null|   1|   p2|
|   0|   0|   p3|
|null|   0|   p4|
|   1|null|   p5|
+----+----+-----+

# Weights
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index'))
weights.show()
+----+----+-----+
|  f1|  f2|index|
+----+----+-----+
|   4|   3|   p1|
|null|   1|   p2|
|   2|   2|   p3|
|null|   3|   p4|
|   3|null|   p5|
+----+----+-----+

# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V.
# If there a no similarities between Row and V outputs 0
def W_sum(row,v,w):
    if len(w[row==v])>0:
        return float(np.sum(w[row==v])/len(w))
    else:
        return 0.0

对于每个列和Data中的每一行,应用上述函数 .

# We iterate over the columns of Values (except the last one called index)
for val in values.columns[:-1]:
    # we filter the data to work only with the columns that are defined for the selected Value
    defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
    # we select only the useful columns
    df_select= df.select(defined_col)
    # we retrieve the reference value and weights
    V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
    W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
    W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
    df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns))))

这给出了:

+---+---+---+---+---+---+
| p1| p2| p3| p4| p5| f1|
+---+---+---+---+---+---+
|  0|  1|  0|  0|  0|2.0|
|  1|  1|  0|  0|  1|1.0|
|  0|  0|  1|  0|  1|2.0|
|  1|  0|  1|  1|  0|0.0|
|  1|  1|  0|  0|  0|0.0|
+---+---+---+---+---+---+

它按照我的要求将列添加到切片的DataFrame中 . 问题是我宁愿将数据收集到一个新的数据中,我可以在最后访问该数据来查询结果 .
它可以像在Pandas中那样在PySpark中增长(有效地)一个DataFrame吗?

Edit to make my goal clearer:
理想情况下,我会得到一个只有计算列的DataFrame,如下所示:

+---+---+
    | f1| f2|
    +---+---+
    |2.0|1.0|
    |1.0|2.0|
    |2.0|0.0|
    |0.0|0.0|
    |0.0|2.0|
    +---+---+

1 回答

  • 2

    您的问题存在一些问题......

    首先,你的 for 循环会产生错误,因为最后一行中的 df_select 没有定义;最后也没有任务(它产生了什么?) .

    假设 df_select 实际上是你的 subsubsample 数据帧,之前定义了一些行,并且你的最后一行是类似的

    new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns))))
    

    然后你的问题开始越来越明显了 . 以来

    values.columns[:-1]
    #  ['f1', 'f2']
    

    整个循环的结果就是这样

    +---+---+---+---+---+ 
    | p1| p2| p3| p4| f2| 
    +---+---+---+---+---+
    |  0|  1|  0|  0|1.0|
    |  1|  1|  0|  0|2.0|
    |  0|  0|  1|  0|0.0|
    |  1|  0|  1|  1|0.0|
    |  1|  1|  0|  0|2.0|
    +---+---+---+---+---+
    

    即仅包含 f2 列(自然,因为 f1 的结果只是被覆盖) .

    现在,正如我所说,假设情况是这样的,并且您的问题实际上是如何将两列 f1f2 放在一起而不是在不同的数据帧中,您可以忘记 subsubsample 并将列追加到您的初始 df ,之后可能会丢失不需要的人:

    init_cols = df.columns
    init_cols
    #  ['p1', 'p2', 'p3', 'p4', 'p5']
    
    new_df = df
    
    for val in values.columns[:-1]:
        # we filter the data to work only with the columns that are defined for the selected Value
        defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
        # we retrieve the reference value and weights
        V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
        W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
        W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
        new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here
    
    # drop initial columns:
    for i in init_cols:
      new_df = new_df.drop(i)
    

    结果 new_df 将是:

    +---+---+ 
    | f1| f2| 
    +---+---+
    |2.0|1.0| 
    |1.0|2.0|
    |2.0|0.0|
    |0.0|0.0|
    |0.0|2.0|
    +---+---+
    

    更新(注释后):要强制 W_sum 函数中的除法为浮点数,请使用:

    from __future__ import division
    

    new_df 现在将是:

    +---------+----+ 
    |       f1|  f2|
    +---------+----+ 
    |      2.0| 1.5|
    |1.6666666|2.25|
    |2.3333333|0.75|
    |      0.0|0.75|
    |0.6666667|2.25|
    +---------+----+
    

    f2 完全一致,根据你的评论 .

相关问题