首页 文章

PySpark:在RDD中使用Object

提问于
浏览
4

我目前正在学习Python,并希望将其应用于/使用Spark . 我有这个非常简单(无用)的脚本:

import sys
from pyspark import SparkContext

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)

    def getValue(self):
        return self.v

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
    print(reduzed.collect())

用它执行时

spark-submit CustomClass.py

..以下错误是thorwn(输出缩短):

引起:org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",第111行,主进程()文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",第106行,进程serializer.dump_stream(func(split_index, iterator),outfile)文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",第133行,在dump_stream中为obj在迭代器中:文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py",第1728行,在add_shuffle_key文件"/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",第415行,在转储中返回pickle.dumps(obj,protocol)PicklingError:不能pickle main .MyClass:属性查找 main .MyClass在org.apache.spark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:166)失败...

声明给我

PicklingError:无法pickle main.MyClass:属性查找main.MyClass失败

似乎很重要 . 这意味着类实例无法序列化,对吧?你知道如何解决这个问题吗?

感谢致敬

1 回答

  • 11

    有很多问题:

    • 如果将 MyClass 放在单独的文件中,则可以进行酸洗 . 这是许多Python使用pickle的常见问题 . 通过移动 MyClass 和使用 from myclass import MyClass 可以很容易地解决这个问题 . 通常 dill 可以修复这些问题(如 import dill as pickle ),但它在这里对我不起作用 .

    • 一旦解决了这个问题,你的reduce就不起作用了,因为调用 addValue 返回 None (没有返回),而不是 MyClass 的实例 . 您需要更改 addValue 以返回 self .

    • 最后, lambda 需要调用 getValue ,所以应该 a.addValue(b.getValue())

    在一起: myclass.py

    class MyClass:
        def __init__(self, value):
            self.v = str(value)
    
        def addValue(self, value):
            self.v += str(value)
            return self
    
        def getValue(self):
            return self.v
    

    main.py

    import sys
    from pyspark import SparkContext
    from myclass import MyClass
    
    if __name__ == "__main__":
        if len(sys.argv) != 1:
            print("Usage CC")
            exit(-1)
    
        data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
        sc = SparkContext(appName="WordCount")
        d = sc.parallelize(data)
        inClass = d.map(lambda input: (input, MyClass(input)))
        reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
        print(reduzed.collect())
    

相关问题