背景
我最初的问题是,为什么在地图函数中使用DecisionTreeModel.predict
会引发异常?并与如何使用 MLlib 在 Spark 上生成元组(原始标签,预测标签)?有关
当我们使用 Scala API 推荐的方式并使用DecisionTreeModel
获得RDD[LabeledPoint]
的预测时,只需在RDD
上进行映射:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
不幸的是,PySpark 中的类似方法不能很好地工作:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
例外:似乎您正在尝试从广播变量,操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,而不能在工作程序上运行的代码中使用。有关更多信息,请参见SPARK-5063。
而不是官方文件建议这样的事情:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
那么这是怎么回事?此处没有广播变量,并且Scala API定义predict
如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing a single data point
* @return Double prediction from the trained model
*/
def predict(features: Vector): Double = {
topNode.predict(features)
}
/**
* Predict values for the given data set using the model trained.
*
* @param features RDD representing data points to be predicted
* @return RDD of predictions for each of the given data points
*/
def predict(features: RDD[Vector]): RDD[Double] = {
features.map(x => predict(x))
}
因此至少乍一看,从动作或转换中调用就不是问题,因为预测似乎是本地操作。
说明
经过一番挖掘,我发现问题的根源是从DecisionTreeModel.predict调用的JavaModelWrapper.call方法。调用 Java 函数所需的访问 SparkContext
:
callJavaFunc(self._sc, getattr(self._java_model, name), *a)
题
对于DecisionTreeModel.predict
,有一个建议的解决方法,所有必需的代码已经是 Scala API 的一部分,但是一般来说,是否有任何优雅的方式来处理此类问题?
我现在能想到的只有解决方案才是重量级的:
-
通过隐式转换扩展 Spark 类或添加某种包装将所有内容推送到 JVM
-
直接使用 Py4j 网关
1 回答
根本不可能使用默认的 Py4J 网关进行通信。要了解为什么我们必须看一下 PySpark Internals 文档[1]中的下图:
由于 Py4J 网关在驱动程序上运行,Python 解释器无法通过套接字与 JVM 工作者进行通信(请参见PythonRDD/rdd.py),因此 Python 解释器无法访问它。
从理论上讲,有可能为每个工作人员创建一个单独的 Py4J 网关,但是在实践中,它不太可能有用。忽略诸如可靠性之类的问题 Py4J 并非旨在执行数据密集型任务。
有什么解决方法吗?
优点:支持的高级级别,不需要访问内部 PySpark API
缺点:相对冗长且没有很好的记录,主要限于输入数据
优点:易于实现(请参见Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?),如果数据已存储在 DataFrame 中,则 Python 和 Scala 之间不会进行数据转换,对 Py4J 的访问最少
缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持
优点:灵活,能够执行任意复杂的代码。可以直接在 RDD 上(例如参见MLlib 模型包装器)或与
DataFrames
(参见如何在 Pyspark 内部使用 Scala 类)一起使用。后一种解决方案似乎更友好,因为所有 ser-de 细节均已由现有 API 处理。缺点:低级,必需的数据转换,与 UDF 相同,要求访问 Py4J 和内部 API,不支持
一些基本示例可以在使用 Scala 转换 PySpark RDD中找到
优点:易于实现,对代码本身的更改最少
缺点:读取/写入数据的费用(阿卢西奥?)
SQLContext
(例如参见阿帕奇·齐柏林或里维)在已注册的临时表中在来宾语言之间传递数据。优点:非常适合交互式分析
缺点:对于批处理作业(Zeppelin)而言并没有那么多,或者可能需要其他编排(Livy)