从任务中调用 Java/Scala 函数

背景

我最初的问题是,为什么在地图函数中使用DecisionTreeModel.predict会引发异常?并与如何使用 MLlib 在 Spark 上生成元组(原始标签,预测标签)?有关

当我们使用 Scala API 推荐的方式并使用DecisionTreeModel获得RDD[LabeledPoint]的预测时,只需在RDD上进行映射:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

不幸的是,PySpark 中的类似方法不能很好地工作:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

例外:似乎您正在尝试从广播变量,操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,而不能在工作程序上运行的代码中使用。有关更多信息,请参见SPARK-5063

而不是官方文件建议这样的事情:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

那么这是怎么回事?此处没有广播变量,并且Scala API定义predict如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

因此至少乍一看,从动作或转换中调用就不是问题,因为预测似乎是本地操作。

说明

经过一番挖掘,我发现问题的根源是从DecisionTreeModel.predict调用的JavaModelWrapper.call方法。调用 Java 函数所需的访问 SparkContext

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

对于DecisionTreeModel.predict,有一个建议的解决方法,所有必需的代码已经是 Scala API 的一部分,但是一般来说,是否有任何优雅的方式来处理此类问题?

我现在能想到的只有解决方案才是重量级的:

  • 通过隐式转换扩展 Spark 类或添加某种包装将所有内容推送到 JVM

  • 直接使用 Py4j 网关

回答(1)

3 years ago

根本不可能使用默认的 Py4J 网关进行通信。要了解为什么我们必须看一下 PySpark Internals 文档[1]中的下图:

在此处输入图片说明

由于 Py4J 网关在驱动程序上运行,Python 解释器无法通过套接字与 JVM 工作者进行通信(请参见PythonRDD/rdd.py),因此 Python 解释器无法访问它。

从理论上讲,有可能为每个工作人员创建一个单独的 Py4J 网关,但是在实践中,它不太可能有用。忽略诸如可靠性之类的问题 Py4J 并非旨在执行数据密集型任务。

有什么解决方法吗?

优点:支持的高级级别,不需要访问内部 PySpark API

缺点:相对冗长且没有很好的记录,主要限于输入数据

  • 使用 Scala UDF 在 DataFrame 上进行操作。

优点:易于实现(请参见Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?),如果数据已存储在 DataFrame 中,则 Python 和 Scala 之间不会进行数据转换,对 Py4J 的访问最少

缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持

  • 以与 MLlib 中类似的方式创建高级 Scala 接口。

优点:灵活,能够执行任意复杂的代码。可以直接在 RDD 上(例如参见MLlib 模型包装器)或与DataFrames(参见如何在 Pyspark 内部使用 Scala 类)一起使用。后一种解决方案似乎更友好,因为所有 ser-de 细节均已由现有 API 处理。

缺点:低级,必需的数据转换,与 UDF 相同,要求访问 Py4J 和内部 API,不支持

一些基本示例可以在使用 Scala 转换 PySpark RDD中找到

  • 使用外部工作流管理工具在 Python 和 Scala/Java 作业之间切换,并将数据传递到 DFS。

优点:易于实现,对代码本身的更改最少

缺点:读取/写入数据的费用(阿卢西奥?)

  • 使用共享的SQLContext(例如参见阿帕奇·齐柏林里维)在已注册的临时表中在来宾语言之间传递数据。

优点:非常适合交互式分析

缺点:对于批处理作业(Zeppelin)而言并没有那么多,或者可能需要其他编排(Livy)