我目前正在学习Python,并希望将其应用于/使用Spark . 我有这个非常简单(无用)的脚本:
import sys
from pyspark import SparkContext
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
def getValue(self):
return self.v
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
print(reduzed.collect())
用它执行时
spark-submit CustomClass.py
..以下错误是thorwn(输出缩短):
引起:org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",第111行,主进程()文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",第106行,进程serializer.dump_stream(func(split_index, iterator),outfile)文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",第133行,在dump_stream中为obj在迭代器中:文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py",第1728行,在add_shuffle_key文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",第415行,在转储中返回pickle.dumps(obj,protocol)PicklingError:不能pickle main .MyClass:属性查找 main .MyClass在org.apache.spark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:166)失败...
声明给我
PicklingError:无法pickle main.MyClass:属性查找main.MyClass失败
似乎很重要 . 这意味着类实例无法序列化,对吧?你知道如何解决这个问题吗?
感谢致敬
1 回答
有很多问题:
如果将
MyClass
放在单独的文件中,则可以进行酸洗 . 这是许多Python使用pickle的常见问题 . 通过移动MyClass
和使用from myclass import MyClass
可以很容易地解决这个问题 . 通常dill
可以修复这些问题(如import dill as pickle
),但它在这里对我不起作用 .一旦解决了这个问题,你的reduce就不起作用了,因为调用
addValue
返回None
(没有返回),而不是MyClass
的实例 . 您需要更改addValue
以返回self
.最后,
lambda
需要调用getValue
,所以应该a.addValue(b.getValue())
在一起:
myclass.py
main.py