首页 文章

Scala UDF在Pyspark中使用了多个参数

提问于
浏览
2

我有一个用Scala编写的UDF,我希望能够通过Pyspark会话进行调用 . UDF接受两个参数:字符串列值和第二个字符串参数 . 如果它只需要一个参数(列值),我就能成功调用UDF . 如果需要多个参数,我很难调用UDF . 这是我迄今为止在Scala和Pyspark中所做的事情:

Scala UDF:

class SparkUDFTest() extends Serializable {
  def stringLength(columnValue: String, columnName: String): Int =
      LOG.info("Column name is: " + columnName)
      return columnValue.length
}

在Scala中使用它时,我已经能够注册并使用这个UDF:

Scala主类:

val udfInstance = new SparkUDFTest()
val stringLength = spark.sqlContext.udf.register("stringlength", udfInstance.stringLength _)
val newDF = df.withColumn("name", stringLength(col("email"), lit("email")))

以上工作成功 . 这是Pyspark的尝试:

def testStringLength(colValue, colName):
  package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().stringLength().apply
  return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column), colName))

在Pyspark中调用UDF:

df.withColumn("email", testStringLength("email", lit("email")))

执行上述操作并在Pyspark中进行一些调整会出现以下错误:

py4j.Py4JException: Method getStringLength([]) does not exist
or
java.lang.ClassCastException: com.test.example.udf.SparkUDFTest$$anonfun$stringLength$1 cannot be cast to scala.Function1
or
TypeError: 'Column' object is not callable

我能够修改UDF只采用一个参数(列值),并能够成功调用它并返回一个新的Dataframe .

Scala UDF类

class SparkUDFTest() extends Serializable {
  def testStringLength(): UserDefinedFunction = udf(stringLength _)
  def stringLength(columnValue: String): Int =
      LOG.info("Column name is: " + columnName)
      return columnValue.length
}

更新Python代码:

def testStringLength(colValue, colName):
  package = "com.test.example.udf.SparkUDFTest"
  udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength().apply
  return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column)))

以上工作成功 . 如果UDF采用额外的参数,我仍然在努力调用UDF . 如何通过Pyspark将第二个参数传递给UDF?

1 回答

  • 0

    我能够通过使用currying来解决这个问题 . 首先将UDF注册为

    def testStringLength(columnName): UserDefinedFunction = udf((colValue: String) => stringLength(colValue, colName)
    

    称为UDF

    udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength("email").apply
    df.withColumn("email", Column(udfInstance(_to_seq(sc, [col("email")], _to_java_column))))
    

    这可以清理一下,但这是我如何使它工作 .

    编辑:我使用currying的原因是因为即使我在第二个参数上使用'lit'我想作为String传递给UDF,我仍然经历了“TypeError:'Column'对象不可调用”错误 . 在Scala中我没有遇到过这个问题 . 我不确定为什么在Pyspark发生这种情况 . 这可能是由于Python解释器和Scala代码之间可能出现的一些复杂问题 . 仍然不清楚,但curry适合我 .

相关问题