是否可以注册用Scala编写的UDF(或函数)在PySpark中使用?例如 . :
val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
在Scala中,现在可以使用以下内容:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
我想在PySpark中使用“UDFaddOne”
%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
背景:我们是一个开发人员团队,一些用Scala编写,一些用Python编写,并且想分享已编写的函数 . 也可以将其保存到库中并导入它 .
2 回答
据我所知,PySpark没有提供任何等效的callUDF函数,因此无法直接访问已注册的UDF .
这里最简单的解决方案是使用原始SQL表达式:
这种方法相当有限,因此如果您需要支持更复杂的工作流,您应该构建一个包并提供完整的Python包装器 . 你会在我对Spark: How to map Python with Scala or Java User Defined Functions?的回答中找到和示例UDAF包装器
以下对我有用(基本上是多个地方的摘要,包括zero323提供的链接):
在scala中:
在python中(假设sc是spark上下文 . 如果你使用spark 2.0,你可以从spark会话中获取它):
当然要确保使用--jars和--driver-class-path添加在scala中创建的jar
那么这里发生了什么:
我们在一个可序列化的对象中创建一个函数,它返回scala中的udf(我不是100%确定需要Serializable,我需要更复杂的UDF,所以它可能是因为它需要传递java对象) .
在python中我们使用访问内部jvm(这是一个私有成员,所以它可以在将来更改但我看不到它)并使用java_import导入我们的包 . 我们访问createUDF函数并调用它 . 这将创建一个具有apply方法的对象(scala中的函数实际上是带有apply方法的java对象) . apply方法的输入是一列 . 应用列的结果是一个新列,因此我们需要使用Column方法将其包装,以使其可用于withColumn .