首页 文章

将自定义函数应用于数据类型的数据帧列

提问于
浏览
0

我有一个名为'counts'的列的数据框,我想将自定义函数“do_something”应用于列的每个元素,即每个数组 . 我不想修改数据帧,我只想对列计数进行单独的操作 . 列的所有数组都具有相同的大小 .

+----------------------+---------------------------------------+
|id|              counts|
+----------------------+---------------------------------------+
|1|          [8.0, 2.0, 3.0|
|2|          [1.0, 6.0, 3.0|                
+----------------------+---------------------------------------+

当我这样做时:

df.select('counts').rdd.foreach(lambda x: do_something(x))

即使我尝试没有lambda它也会给出同样的错误 .

它在上面的行上失败了

Py4JJavaError Traceback(最近一次调用last)in()----> 1 df.select('counts') . rdd.foreach(lambda x:do_something(x))/usr/hdp/2.5.3.0-37/ foreach中的spark / python / pyspark / rdd.py(self,f)745 f(x)746 return iter([]) - > 747 self.mapPartitions(processPartition).count()#Force evaluation 748 749 def foreachPartition( self,f):/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in count(self)1002 3 1003“”“ - > 1004 return self.mapPartitions(lambda i:[sum( 1 for _ in i)]) . sum()1005 1006 def stats(self):/ usr / hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in sum(self)993 6.0 994“” “ - > 995返回self.mapPartitions(lambda x:[sum(x)]) . fold(0,operator.add)996 997 def count(self):/ usr / hdp/2.5.3.0-37/spark/ fold(self,zeroValue,op)中的python / pyspark / rdd.py 867#zeroValue提供给每个分区是唯一的,从提供的868#到最终的reduce调用 - > 869 vals = self.mapPartitions(func).collect ()870 return reduce(op,vals,zeroValue)871 /usr/hdp/2.5.3.0-37/spark/python/pyspa rk / rdd.py in collect(self)769“”“770 with SCCallSiteSync(self.context)as css: - > 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())772返回列表(_load_from_socket(port,self._jrdd_deserializer))773 /usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in call(self,* args )811 answer = self.gateway_client.send_command(command)812 return_value = get_return_value( - > 813 answer,self.gateway_client,self.target_id,self.name)814 815 for temp_arg in temp_args:/usr/hdp/2.5.3.0 -37 / spark / python / pyspark / sql / utils.py in deco(* a,** kw)43 def deco(* a,** kw):44 try:---> 45 return f(* a, ** kw)46除了py4j.protocol.Py4JJavaError为e:47 s = e.java_exception.toString()/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j get_return_value中的/protocol.py(answer,gateway_client,target_id,name)306引发Py4JJavaError(307“调用{0}时发生错误{1} {2} . \ n” . - > 308格式(target_id,“ . ”,名称),值)309 else:310引发Py4JError(

虽然所有输入数组都具有相同的大小 .

big_list=[]
def do_something(i_array):
    outputs = custom_library(i_array) # takes as input an array and returns 3 new lists
    big_list.extend(outputs)

1 回答

  • 2

    你的 UDF 修改了一个python对象,即:

    • 数据帧的外观,即使函数工作,你也不会将它返回到数据帧的行

    • huge,它的元素数量至少是数据帧中行数的三倍

    您可以尝试这样做:

    def do_something(i_array):
        outputs = custom_library(i_array)
        return outputs
    
    import pyspark.sql.functions as psf
    do_something_udf = psf.udf(do_something, ArrayType(ArrayType(DoubleType()))
    

    DoubleType() 或您返回的任何类型

    df.withColumn("outputs", psf.explode(do_something_udf("count")))
    

    你的行数是 df 的三倍

相关问题