首页 文章

PySpark . 将Dataframe传递给pandas_udf并返回一个系列

提问于
浏览
0

我'm using PySpark'的新 pandas_udf 装饰器,我试图让它将多列作为输入并返回一个系列作为输入,但是,我得到一个 TypeError: Invalid argument

示例代码

@pandas_udf(df.schema, PandasUDFType.SCALAR)
def fun_function(df_in):
    df_in.loc[df_in['a'] < 0] = 0.0
    return (df_in['a'] - df_in['b']) / df_in['c']

1 回答

  • 2

    A SCALAR udf期望pandas系列作为输入而不是数据帧 . 对于您的情况,没有必要使用udf . 剪切后列 abc 的直接计算应该有效:

    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([[1,2,4],[-1,2,2]], ['a', 'b', 'c'])
    
    clip = lambda x: f.when(df.a < 0, 0).otherwise(x)
    df.withColumn('d', (clip(df.a) - clip(df.b)) / clip(df.c)).show()
    
    #+---+---+---+-----+
    #|  a|  b|  c|    d|
    #+---+---+---+-----+
    #|  1|  2|  4|-0.25|
    #| -1|  2|  2| null|
    #+---+---+---+-----+
    

    如果你必须使用 pandas_udf ,你的返回类型必须是 double ,而不是 df.schema ,因为你只返回一个pandas系列而不是pandas数据帧;此外,您还需要将列作为Series传递到函数而不是整个数据框:

    @pandas_udf('double', PandasUDFType.SCALAR)
    def fun_function(a, b, c):
        clip = lambda x: x.where(a >= 0, 0)
        return (clip(a) - clip(b)) / clip(c)
    
    df.withColumn('d', fun_function(df.a, df.b, df.c)).show()
    #+---+---+---+-----+                                                             
    #|  a|  b|  c|    d|
    #+---+---+---+-----+
    #|  1|  2|  4|-0.25|
    #| -1|  2|  2| null|
    #+---+---+---+-----+
    

相关问题