首页 文章

Pyspark - TypeError:'float' object在使用reduceByKey计算平均值时不可订阅

提问于
浏览
0

我的“asdasd.csv”文件具有以下结构 .

Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

好的,我得到以下{key,value}元组来操作它 .

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value)

我的计算均值的代码如下,我必须计算每列的平均值,每个键的X,Y Z.

rdd_ori = sc.textFile("asdasd.csv") \
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

我的问题我,我尝试了这个代码,它在其他PC上工作正常,我用它来开发它(PySpark Py3)

这是一个例子,这段代码是正确的:

enter image description here

但是我没有收到这个错误,重要的是 Strong .

------------------------------------------------- -------------------------- Py4JJavaError Traceback(最近一次调用last)in()9#sum_1 = count_.reduceByKey(lambda x,y: (x [0] [0] y [0] [0],x0 y0,x [0] [2] y [0] [2]))10 ---> 11打印(meanRDD.take(1)) /opt/spark/current/python/pyspark/rdd.py in take(self,num)1341 1342 p = range(partsScanned,min(partsScanned numPartsToTry,totalParts)) - > 1343 res = self.context.runJob(self, takeUpToNumLeft,p)1344 1345 items = res /opt/spark/current/python/pyspark/context.py in runJob(self,rdd,partitionFunc,partitions,allowLocal)990#SparkContext#runJob . 991 mappedRDD = rdd.mapPartitions(partitionFunc) - > 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,partitions)993返回列表(_load_from_socket(port,mappedRDD._jrdd_deserializer)) 994 /opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self,* args)1131 answer = self.gateway_client.send_command(command)1132 return_value = get_return_value ( - > 1133回答,self.gateway_client,self.target_id,self.name)1134 1135 for temp_arg temp_args:/opt/spark/current/python/pyspark/sql/utils.py in deco(* a,** kw )61 def deco(* a,** kw):62 try:---> 63返回f(* a,** kw)64除了py4j.protocol.Py4JJavaError为e:65 s = e.java_exception.toString( )get_return_value中的/opt/spark/current/pyurrent/lib/py4j-0.10.4-src.zip/py4j/protocol.py(answer,gateway_client,target_id,name)317引发Py4JJavaError(318“调用时发生错误{ 0} {1} {2} . \ n“ . - > 319格式(target_id,” . “,名称),值)320 else:321引发Py4JError(Py4JJavaError:An e调用z:org.apache.spark.api.python.PythonRDD.runJob时出错 . :org.apache.spark.SparkException:作业因阶段失败而中止:阶段127.0中的任务0失败1次,最近失败:阶段127.0中丢失任务0.0(TID 102,localhost, Actuator 驱动程序):org.apache.spark .api.python.PythonException:Traceback(最近一次调用最后一次):文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,第177行,在主进程()文件中“/ opt / spark / current / python / lib / pyspark.zip / pyspark / worker.py“,第172行,进程serializer.dump_stream(func(split_index,iterator),outfile)文件”/ opt / spark / current / python / pyspark / rdd.py“,第2423行,在pipeline_func中返回func(split,prev_func(split,iterator))文件”/opt/spark/current/python/pyspark/rdd.py“,第2423行,在pipeline_func中返回func( split,prev_func(split,iterator))文件“/opt/spark/current/python/pyspark/rdd.py”,第346行,在func中返回f(迭代器)文件“/ opt / spark / current / python / pyspark / rdd.py“,第1842行,在combineLocally merger.mergeValues(iterator)文件”/ opt / spark / current / python / lib / p yspark.zip/pyspark/shuffle.py“,第238行,在mergeValues d [k] = comb(d [k],v)中,如果k in d else creator(v)File”“,第3行,在TypeError中: float'对象不可订阅

1 回答

  • 3

    Heres how reduceByKey works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey

    #                                 x           y        z
    [(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
    #           part A (key)               part B (value)       counter
    

    让我一步一步走

    执行以下 mapValues 功能后

    rdd_ori.mapValues(lambda x: (x,1))
    

    rdd数据看起来像

    ((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
    ((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
    ((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
    ((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))
    

    所以当 reduceByKey 被调用为

    .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
    

    并且 all the rows with same key are grouped and values are passed to the lambda function of reducyByKey .

    因为在您的情况下,所有键都相同,所以在以下迭代中将值传递给 ab 变量 .

    在第一次迭代中, a((-5.9427185, 0.6761626999999999, 8.128204), 1) 并且 b((-5.958191, 0.6880646, 8.135345), 1) 所以计算部分 (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) 是正确的并且通过 .

    在第二次迭代中, a(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) 的输出,即 (-11.910430999999999, 1.3582764, 16.271881, 2)

    So if you look at the format of the data there is no such a[0][0] in a. You can just get a[0], a[1] .. and so on. So thats the issue. And thats what the error message is suggesting too .

    TypeError:'float'对象不可订阅

    The solution to this is to format the data so that you can access a as a[0][0] which can be done if you format your reduceByKey of the following format.

    .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
    

    但这会给你的上一个 mapValues 功能带来麻烦

    .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
    

    因为您的值 i.e. a in lambda function((-23.848236199999995, 2.6879882999999998, 32.604461), 4) 所以 a[0] 表示 (-23.848236199999995, 2.6879882999999998, 32.604461)a[1] 表示 4 并且没有更多因此您将遇到

    IndexError:元组索引超出范围

    所以你的最后一个 mapValues 应该是

    .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
    

    So overall, following code should work for you

    rdd_ori = sc.textFile("asdasd.csv") \
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
    
    meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) \
        .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))\
        .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
    

    我希望我已经解释得很好 .

相关问题