首页 文章

从Scala注册UDF到SqlContext以在PySpark中使用

提问于
浏览
4

是否可以注册用Scala编写的UDF(或函数)在PySpark中使用?例如 . :

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2

在Scala中,现在可以使用以下内容:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3

我想在PySpark中使用“UDFaddOne”

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work

背景:我们是一个开发人员团队,一些用Scala编写,一些用Python编写,并且想分享已编写的函数 . 也可以将其保存到库中并导入它 .

2 回答

  • 3

    据我所知,PySpark没有提供任何等效的callUDF函数,因此无法直接访问已注册的UDF .

    这里最简单的解决方案是使用原始SQL表达式:

    mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))
    
    ## OR
    sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")
    
    ## OR
    mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")
    

    这种方法相当有限,因此如果您需要支持更复杂的工作流,您应该构建一个包并提供完整的Python包装器 . 你会在我对Spark: How to map Python with Scala or Java User Defined Functions?的回答中找到和示例UDAF包装器

  • 3

    以下对我有用(基本上是多个地方的摘要,包括zero323提供的链接):

    在scala中:

    package com.example
    import org.apache.spark.sql.functions.udf
    
    object udfObj extends Serializable {
      def createUDF = {
        udf((x: Int) => x + 1)
      }
    }
    

    在python中(假设sc是spark上下文 . 如果你使用spark 2.0,你可以从spark会话中获取它):

    from py4j.java_gateway import java_import
    from pyspark.sql.column import Column
    
    jvm = sc._gateway.jvm
    java_import(jvm, "com.example")
    def udf_f(col):
        return Column(jvm.com.example.udfObj.createUDF().apply(col))
    

    当然要确保使用--jars和--driver-class-path添加在scala中创建的jar

    那么这里发生了什么:

    我们在一个可序列化的对象中创建一个函数,它返回scala中的udf(我不是100%确定需要Serializable,我需要更复杂的UDF,所以它可能是因为它需要传递java对象) .

    在python中我们使用访问内部jvm(这是一个私有成员,所以它可以在将来更改但我看不到它)并使用java_import导入我们的包 . 我们访问createUDF函数并调用它 . 这将创建一个具有apply方法的对象(scala中的函数实际上是带有apply方法的java对象) . apply方法的输入是一列 . 应用列的结果是一个新列,因此我们需要使用Column方法将其包装,以使其可用于withColumn .

相关问题