我实际上是在尝试使用累加器来定义UDF . 累加器用于保存my_function中的异常以供日后使用 . 我想出了一些带有一些参数的udf定义(returnType,accumulator) . 我想让它更具可读性和可重用性 . 我怎么能用下面的代码定义装饰器功能?

from pyspark.sql import functions as F
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.types import StringType, StructField, IntegerType, StructType
from pyspark.sql import Row

data = [
    Row(word="foo", number=7),
    Row(word="bar", number=13)]

schema = StructType([
    StructField("word", StringType(), True),
    StructField("number", IntegerType(), True)])

df = spark.createDataFrame(data, schema)

创建我的自定义累加器

class ListParam(AccumulatorParam):
    def zero(self, v):
        return []

    def addInPlace(self, variable, value):
        variable.append(value)
        return variable

accum = spark.sparkContext.accumulator([], ListParam())

我的udf的定义

def accumulator_udf(accumulator, returnType):
    def my_function(x):
        y = None
        try:
            y = (x / (x - 7))
        except Exception as e:
            accumulator.add(dict([('errorType', str(e)), ('Data', x)]))
        return y

    return F.udf(my_function, returnType)

my_udf = accumulator_udf(accumulator=element_div_acc, returnType=IntegerType())

结果

df.select(my_udf(df.number)).show()

  +---------------+
  |div_one(number)|
  +---------------+
  |           null|
  |              2|
  +---------------+

print(accum.value)
> [[{'errorType': 'integer division or modulo by zero', 'Data': 7}], []]

我尝试过的

经过几次阅读后,我发现这篇帖子(https://www.thecodeship.com/patterns/guide-to-python-function-decorators/)有所帮助,但却遇到了NameError问题

def accumulator_udf(accumulator, returnType):
    def func_wrapper(func):
            return F.udf(func, returnType)
    return func_wrapper

accum = spark.sparkContext.accumulator([], ListParam())
@accumulator_udf(accumulator=accum , returnType=IntegerType())
def my_function(x):
    y = None
    try:
        y = (x / (x - 7))
    except Exception as e:
        accumulator.add(dict([('errorType', str(e)), ('Data', x)]))
    return y

df.select(my_function(df.number)).show()

尝试此实现时,我有以下错误:

NameError: global name 'accumulator' is not defined

我怎么能设法能够访问'累加器'?

谢谢 !