首页 文章

如何在Apache Spark(pyspark)中使用自定义类?

提问于
浏览
17

我在python中编写了一个实现分类器的类 . 我想使用Apache Spark来使用此分类器并行化大量数据点的分类 .

  • 我'm set up using Amazon EC2 on a cluster with 10 slaves, based off an ami that comes with python' s Anaconda分发就可以了 . ami让我可以远程使用IPython Notebook .

  • 我在文件调用BoTree.py中定义了类BoTree,该文件调用文件名为/root/anaconda/lib/python2.7/,这是我所有的python模块都在

  • 我已经检查过从主服务器运行命令行spark时我可以导入和使用BoTree.py(我只需要从编写导入BoTree开始,我的类BoTree就可以了

  • I 've used spark' s /root/spark-ec2/copy-dir.sh脚本在我的集群中复制/python2.7/目录 .

  • 我已经闯入其中一个奴隶并尝试在那里运行ipython,并且能够导入BoTree,所以我认为该模块已成功发送到集群中(我还可以看到该模块中的BoTree.py文件) . ../python2.7/文件夹)

  • 在主I 've checked I can pickle and unpickle a BoTree instance using cPickle, which I understand is pyspark' s序列化器上 .

However ,当我执行以下操作时:

import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()

Spark失败并出现错误(我认为只是相关的一点):

File "/root/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/root/spark/python/pyspark/serializers.py", line 405, in loads
    return cPickle.loads(obj)
ImportError: No module named BoroughTree

谁能帮我?有点绝望......

谢谢

2 回答

  • 15

    可能最简单的解决方案是在创建 SparkContext 时使用 pyFiles 参数

    from pyspark import SparkContext
    sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])
    

    放在那里的每个文件都将发送给 Worker 并添加到 PYTHONPATH .

    如果您在交互模式下工作,则必须在创建新上下文之前使用 sc.stop() 停止现有上下文 .

    还要确保Spark worker实际上使用的是Anaconda发行版,而不是默认的Python解释器 . 根据您的描述,这很可能是问题所在 . 要设置 PYSPARK_PYTHON ,您可以使用 conf/spark-env.sh 文件 .

    另外,将文件复制到 lib 是一个相当混乱的解决方案 . 如果你想避免使用 pyFiles 推送文件,我建议你创建普通的Python包或Conda包以及正确的安装 . 通过这种方式,您可以轻松跟踪已安装的内容,删除不必要的软件包并避免一些难以调试的问题 .

  • 11

    获取SparkContext后,也可以使用 addPyFile 随后将模块发送给每个工作者 .

    sc.addPyFile('/path/to/BoTree.py')
    

    pyspark.SparkContext.addPyFile(path) documentation

相关问题