我的“asdasd.csv”文件具有以下结构 .
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
好的,我得到以下{key,value}元组来操作它 .
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
# part A (key) part B (value)
我的计算均值的代码如下,我必须计算每列的平均值,每个键的X,Y Z.
rdd_ori = sc.textFile("asdasd.csv") \
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
我的问题我,我尝试了这个代码,它在其他PC上工作正常,我用它来开发它(PySpark Py3)
这是一个例子,这段代码是正确的:
但是我没有收到这个错误,重要的是 Strong .
------------------------------------------------- -------------------------- Py4JJavaError Traceback(最近一次调用last)in()9#sum_1 = count_.reduceByKey(lambda x,y: (x [0] [0] y [0] [0],x0 y0,x [0] [2] y [0] [2]))10 ---> 11打印(meanRDD.take(1)) /opt/spark/current/python/pyspark/rdd.py in take(self,num)1341 1342 p = range(partsScanned,min(partsScanned numPartsToTry,totalParts)) - > 1343 res = self.context.runJob(self, takeUpToNumLeft,p)1344 1345 items = res /opt/spark/current/python/pyspark/context.py in runJob(self,rdd,partitionFunc,partitions,allowLocal)990#SparkContext#runJob . 991 mappedRDD = rdd.mapPartitions(partitionFunc) - > 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,partitions)993返回列表(_load_from_socket(port,mappedRDD._jrdd_deserializer)) 994 /opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self,* args)1131 answer = self.gateway_client.send_command(command)1132 return_value = get_return_value ( - > 1133回答,self.gateway_client,self.target_id,self.name)1134 1135 for temp_arg temp_args:/opt/spark/current/python/pyspark/sql/utils.py in deco(* a,** kw )61 def deco(* a,** kw):62 try:---> 63返回f(* a,** kw)64除了py4j.protocol.Py4JJavaError为e:65 s = e.java_exception.toString( )get_return_value中的/opt/spark/current/pyurrent/lib/py4j-0.10.4-src.zip/py4j/protocol.py(answer,gateway_client,target_id,name)317引发Py4JJavaError(318“调用时发生错误{ 0} {1} {2} . \ n“ . - > 319格式(target_id,” . “,名称),值)320 else:321引发Py4JError(Py4JJavaError:An e调用z:org.apache.spark.api.python.PythonRDD.runJob时出错 . :org.apache.spark.SparkException:作业因阶段失败而中止:阶段127.0中的任务0失败1次,最近失败:阶段127.0中丢失任务0.0(TID 102,localhost, Actuator 驱动程序):org.apache.spark .api.python.PythonException:Traceback(最近一次调用最后一次):文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,第177行,在主进程()文件中“/ opt / spark / current / python / lib / pyspark.zip / pyspark / worker.py“,第172行,进程serializer.dump_stream(func(split_index,iterator),outfile)文件”/ opt / spark / current / python / pyspark / rdd.py“,第2423行,在pipeline_func中返回func(split,prev_func(split,iterator))文件”/opt/spark/current/python/pyspark/rdd.py“,第2423行,在pipeline_func中返回func( split,prev_func(split,iterator))文件“/opt/spark/current/python/pyspark/rdd.py”,第346行,在func中返回f(迭代器)文件“/ opt / spark / current / python / pyspark / rdd.py“,第1842行,在combineLocally merger.mergeValues(iterator)文件”/ opt / spark / current / python / lib / p yspark.zip/pyspark/shuffle.py“,第238行,在mergeValues d [k] = comb(d [k],v)中,如果k in d else creator(v)File”“,第3行,在TypeError中: float'对象不可订阅
1 回答
Heres how reduceByKey works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey
让我一步一步走
执行以下
mapValues
功能后rdd数据看起来像
所以当
reduceByKey
被调用为并且 all the rows with same key are grouped and values are passed to the lambda function of reducyByKey .
因为在您的情况下,所有键都相同,所以在以下迭代中将值传递给
a
和b
变量 .在第一次迭代中,
a
是((-5.9427185, 0.6761626999999999, 8.128204), 1)
并且b
是((-5.958191, 0.6880646, 8.135345), 1)
所以计算部分(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
是正确的并且通过 .在第二次迭代中,
a
是(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
的输出,即(-11.910430999999999, 1.3582764, 16.271881, 2)
So if you look at the format of the data there is no such a[0][0] in a. You can just get a[0], a[1] .. and so on. So thats the issue. And thats what the error message is suggesting too .
The solution to this is to format the data so that you can access a as a[0][0] which can be done if you format your reduceByKey of the following format.
但这会给你的上一个
mapValues
功能带来麻烦因为您的值 i.e. a in lambda function 是
((-23.848236199999995, 2.6879882999999998, 32.604461), 4)
所以a[0]
表示(-23.848236199999995, 2.6879882999999998, 32.604461)
和a[1]
表示4
并且没有更多因此您将遇到所以你的最后一个
mapValues
应该是So overall, following code should work for you
我希望我已经解释得很好 .