我正在本地制作一些 Pyspark 代码的原型,该代码用于对点列表进行分类。我对 Spark 还是很陌生,所以可能我的技能不足在结果中起着很大的作用,但是我现在已经尝试了很多事情,并且我会非常感谢一些建议。

这是一小段代码,以显示主要思想:

kdt = kdtree.KDTree(np_pointcloud[:,0:2])
kdt_b = sc.broadcast(kdt)
t = points_parsed.map(lambda x: kdt_b.value.query_ball_point(x,r=window_size))

基本上,这个想法是从 Scipy 的 KDTree 实现中获得一棵树,该树将用于从 RDD 映射中为窗口中的一组邻居的每个输入点查询一组点。

我在 Python 笔记本中发现的错误似乎与 serealization 有关。无论如何,我都尝试在计算机中本地使用解决方法对 KDTree 进行酸洗,并且在加载酸洗的文件之后,该对象将按预期工作。我不确定错误是否与序列化有关,或者我是否无法正确理解要处理的概念。这个想法是广播已经建立的 KDTree 对象,以便每个工作人员都可以访问该对象,以便在执行地图时执行查询。

File "/usr/local/bin/spark-1.3.1-hadoop2.6/python/pyspark/broadcast.py",line 106, in value
self._value = self.load(self._path)
File "/usr/local/bin/spark-1.3.1binhadoop2.6/python/pyspark/broadcast.py",line 95, in load
return cPickle.loads(data)