首页 文章

在pyspark中使用Scala UDF中的默认参数值?

提问于
浏览
1

我在Scala中定义了一个UDF,默认参数值如下:

package myUDFs

import org.apache.spark.sql.api.java.UDF3

class my_udf extends UDF3[Int, Int, Int, Int] {

  override def call(a: Int, b: Int, c: Int = 6): Int = {
    c*(a + b)
  }
}

然后我使用 build clean assembly 适当地构建它(如果需要可以提供更多的构建细节)并解压缩jar myUDFs-assembly-0.1.1.jar 并在Python的Spark配置中包含它:

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntType

spark_conf = SparkConf().setAll([
    ('spark.jars', 'myUDFs-assembly-0.1.1.jar')
])

spark = SparkSession.builder \
    .appName('my_app') \
    .config(conf = spark_conf) \
    .enableHiveSupport() \
    .getOrCreate()

spark.udf.registerJavaFunction(
    "my_udf", "myUDFs.my_udf", IntType()
)

但是,当我尝试利用默认值时,我拒绝了:

spark.sql('select my_udf(1, 2)').collect()

AnalysisException:'函数my_udf的参数数量无效 . 预期:3;发现:2; line x pos y'

是否有可能像这样的默认值的UDF?输出应为 6*(1+2) = 18 .

2 回答

  • 1

    只看链条,就没有机会在这里识别默认参数 .

    * Register a deterministic Java UDF3 instance as user-defined function (UDF).
     * @since 1.3.0
     */
    def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit = {
      val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
      def builder(e: Seq[Expression]) = if (e.length == 3) {
        ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
      } else {
        throw new AnalysisException("Invalid number of arguments for function " + name +
          ". Expected: 3; Found: " + e.length)
      }
      functionRegistry.createOrReplaceTempFunction(name, builder)
    }
    

    如您所见, builder 仅在实际调度调用之前验证提供的表达式是否与函数的 arity 匹配 .

    您可能会更好地实现一个中间API,它将处理默认参数并在封面下调度到UDF . 但是这只适用于 DataFrame API,因此可能不适合您的需求 .

  • -2

    在spark sql中调用函数时,您只传递两个参数 . 尝试传递三个参数

    spark.sql('select my_udf(1, 2, 3 )').collect()
    

相关问题