我有一段代码很好但是使用pandas数据帧组进行处理 . 但是因为文件很大(我需要将大约7000万个组转换为使用PYSPARK数据帧的代码 . 以下是使用pandas dataframe和小示例数据的原始代码:导入pandas as pd import numpy as np from scipy.optimize import minimize
df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20),
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})
# Starting values
startVal = np.ones(2)*(1/2)
#Constraint Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)
# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1']) **2)
# Define a function to call minimize function
def RunMinimize(data, startVal, bnds, cons):
ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x
# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')
Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))
现在我正在尝试使用pySpark数据帧我将pandas dataframe转换为pyspark数据帧以便测试代码 .
sdf = sqlContext.createDataFrame(df)
type(sdf)
# <class 'pyspark.sql.dataframe.DataFrame'>
# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')
# Redefine functions
def sSumSqDif(a, sdf):
return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)
def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType
udf = UserDefinedFunction(sRunMinimize , StringType())
Results = Sgrp_grpVar.agg(sRunMinimize())
但是,在我尝试定义用户定义的函数udf之后,我收到了以下错误 - 请参阅下文 . 任何纠正我的错误或建议替代方法的帮助都非常感谢 .
udf = UserDefinedFunction(sRunMinimize,StringType())Traceback(最近一次调用最后一次):文件“", line 1, in File " /usr/hdp/current/spark2-client/python/pyspark/sql/functions.py”,第1760行,在 init self中 . _judf = self._create_judf(name).......
1 回答
你're trying to write a User Defined Aggregate Function which can't在pyspark完成,见https://stackoverflow.com/a/40030740 .
你可以写的是关于作为列表收集的每个组内数据的
UDF
:首先进行设置:
我们将广播这些变量,因为我们需要在聚合数据帧的每一行调用它们,它会将值复制到每个节点,这样它们就不必在驱动程序上获取它们:
让我们使用
collect_list
聚合数据,我们'll change the structure of the data around so we only have one column (you can collect each column into distinct columns but then you' d必须修改你将数据传递给函数的方式):我们现在可以创建
UDF
,返回的数据类型对于pyspark来说太复杂了,pyspark不支持numpy arrays
所以我们需要稍微更改一下:我们现在可以将此函数应用于每个组中收集的数据:
但我不认为pyspark是正确的工具 .