首页 文章

Pyspark数据帧:如何按组应用scipy.optimize功能

提问于
浏览
0

我有一段代码很好但是使用pandas数据帧组进行处理 . 但是因为文件很大(我需要将大约7000万个组转换为使用PYSPARK数据帧的代码 . 以下是使用pandas dataframe和小示例数据的原始代码:导入pandas as pd import numpy as np from scipy.optimize import minimize

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20), 
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})

# Starting values
startVal = np.ones(2)*(1/2)

#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})

# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
    return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1'])  **2)

# Define a function to call minimize function 
def RunMinimize(data, startVal, bnds, cons):
    ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
    bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')

Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

现在我正在尝试使用pySpark数据帧我将pandas dataframe转换为pyspark数据帧以便测试代码 .

sdf = sqlContext.createDataFrame(df)
type(sdf)
#  <class 'pyspark.sql.dataframe.DataFrame'>

# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')

# Redefine functions
def sSumSqDif(a, sdf):
    return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)

def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

udf = UserDefinedFunction(sRunMinimize , StringType())

Results = Sgrp_grpVar.agg(sRunMinimize())

但是,在我尝试定义用户定义的函数udf之后,我收到了以下错误 - 请参阅下文 . 任何纠正我的错误或建议替代方法的帮助都非常感谢 .

udf = UserDefinedFunction(sRunMinimize,StringType())Traceback(最近一次调用最后一次):文件“", line 1, in File " /usr/hdp/current/spark2-client/python/pyspark/sql/functions.py”,第1760行,在 init self中 . _judf = self._create_judf(name).......

1 回答

  • 1

    你're trying to write a User Defined Aggregate Function which can't在pyspark完成,见https://stackoverflow.com/a/40030740 .

    你可以写的是关于作为列表收集的每个组内数据的 UDF

    首先进行设置:

    import pandas as pd 
    import numpy as np 
    from scipy.optimize import minimize
    import pyspark.sql.functions as psf
    from pyspark.sql.types import *
    
    df = pd.DataFrame({
        'y0': np.random.randn(20),
        'y1': np.random.randn(20),
        'x0': np.random.randn(20), 
        'x1': np.random.randn(20),
        'grpVar': ['a', 'b'] * 10})
    sdf = sqlContext.createDataFrame(df)
    
    # Starting values
    startVal = np.ones(2)*(1/2)
    #Constraint  Sum of coefficients = 0
    cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
    # Bounds on coefficients
    bnds = tuple([0,1] for x in startVal)
    

    我们将广播这些变量,因为我们需要在聚合数据帧的每一行调用它们,它会将值复制到每个节点,这样它们就不必在驱动程序上获取它们:

    sc.broadcast(startVal)
    sc.broadcast(bnds)
    

    让我们使用 collect_list 聚合数据,我们'll change the structure of the data around so we only have one column (you can collect each column into distinct columns but then you' d必须修改你将数据传递给函数的方式):

    Sgrp_grpVar = sdf\
        .groupby('grpVar')\
        .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
    Sgrp_grpVar.printSchema()
    
        root
         |-- grpVar: string (nullable = true)
         |-- data: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- y0: double (nullable = true)
         |    |    |-- y1: double (nullable = true)
         |    |    |-- x0: double (nullable = true)
         |    |    |-- x1: double (nullable = true)
    

    我们现在可以创建 UDF ,返回的数据类型对于pyspark来说太复杂了,pyspark不支持 numpy arrays 所以我们需要稍微更改一下:

    def sSumSqDif(a, data):
        return np.sum(
            (data['y0'] - a[0]*data['x0'])**2 \
            + (data['y1'] - a[1]*data['x1'])**2)
    
    def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
        data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
        ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                           bounds=bnds, constraints = cons, args=(data))
        return ResultByGrp.x.tolist()
    
    sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
        lambda data: sRunMinimize(data, startVal, bnds, cons), 
        ArrayType(DoubleType())
    )
    

    我们现在可以将此函数应用于每个组中收集的数据:

    Results = sdf_agg.select(
        "grpVar", 
        sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
    )
    Results.show(truncate=False)
    
        +------+-----------------------------------------+
        |grpVar|res                                      |
        +------+-----------------------------------------+
        |b     |[0.4073139282953772, 0.5926860717046227] |
        |a     |[0.8275186444565927, 0.17248135554340727]|
        +------+-----------------------------------------+
    

    但我不认为pyspark是正确的工具 .

相关问题