首页 文章

pyspark数据框UDF异常处理

提问于
浏览
-1

我已经编写了一个使用python在spark中使用的UDF . 此函数需要一个日期(在字符串中,例如'2017-01-06')和一个字符串数组(例如:[2017-01-26,2017-02-26,2017-04-17])并返回#自上次最近的日期以来的天数 . UDF是

def findClosestPreviousDate(currdate, date_list):
    date_format = "%Y-%m-%d"
    currdate = datetime.datetime.strptime(currdate, date_format)
    result = currdate
    date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    lowestdiff = 10000
    for dt in date_list:
        if(dt >= currdate):
            continue
        delta = currdate-dt
        diff = delta.days
        if(diff < lowestdiff):
            lowestdiff = diff
            result = dt
    dlt = currdate-result
    return dlt.days


findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())

我在下面这样称呼它

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(size(col("activity_arr")) > 0, findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

即使我删除列"activity_arr"中的所有空值,我仍然会收到此 NoneType 错误 . 尝试在功能内部进行应急处理(仍然相同) .

我们有更好的方法在UDF运行时捕获错误记录(可能正在使用累加器左右,我见过很少有人尝试使用scala)

错误:

------------------------------------------------- -------------------------- Py4JJavaError Traceback(最近一次调用last)in()----> 1 grouped_extend_df2.show()/ usr /lib/spark/python/pyspark/sql/dataframe.pyc in show(self,n,truncate)334“”“335 if isinstance(truncate,bool)and truncate: - > 336 print(self._jdf.showString( n,20))337 else:338 print(self._jdf.showString(n,int(truncate)))/ usr / lib / spark / python / lib / py4j-0.10.4-src.zip / py4j / java_gateway . py in in call(self,* args)1131 answer = self.gateway_client.send_command(command)1132 return_value = get_return_value( - > 1133 answer,self.gateway_client,self.target_id,self.name)1134 1135 temp_arg in temp_args:/ usr / lib / spark / python / pyspark / sql / utils.pyc in deco(* a,** kw)61 def deco(* a,** kw):62 try:---> 63 return f(* a ,** kw)64除了py4j.protocol.Py4JJavaError为e:65 s = e.java_exception.toString()/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol . py在get_return_value中(answer,gateway_client,target_id, name)317 raise Py4JJavaError(318“调用{0} {1}时发生错误{2} . \ n” . - > 319格式(target_id,“ . ”,名称),值)320 else:321引发Py4JError(Py4JJavaError:调用o1111.showString时发生错误 . :org.apache.spark.SparkException:作业因阶段失败而中止:阶段315.0中的任务0失败1次,最近失败:阶段315.0中丢失任务0.0(TID 18390,localhost, Actuator 驱动程序):org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第177行,主进程()文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark /worker.py“,第172行,进程serializer.dump_stream(func(split_index,iterator),outfile)文件”/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py“,第104行,在func = lambda _中,它:map(mapper,it)文件“”,第1行,在文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第71行,返回lambda * a:f(* a)文件“”,第5行,在findClosestPreviousDate中TypeError:'NoneType'对象在org.apache.spark中不可迭代 . api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:193)org.apache.apache.api.py.位于org.apache.spark.sql的org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply(BatchEvalPythonExec.scala:144)的.python.PythonRunner.compute(PythonRDD.scala:152) org.apache.spark.rdd.RDD上的.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply(BatchEvalPythonExec.scala:87)$$ anonfun $ mapPartitions $ 1 $$ anonfun $ apply $ 23.apply(RDD.scala: 797)at org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ apply $ 23.apply(RDD.scala:797)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala: 38)位于org.apache.spark.rdd.MapPartitionsRDD的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)atg.apache.spark.rdd.RDD.iterator(RDD.scala:287)位于org.apache.spark.rdd.RDD.iterator的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)的.compute(MapPartitionsRDD.scala:38) (RDD.scala:287)位于org.apache的org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)atg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) . spark.rdd.RDD.iterator(RDD.scala:287)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)at org.apache.spark.scheduler.Task.run(Task.scala: 108)atg.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:338)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker .run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)驱动程序stacktrace:at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala:1517)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1505)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala:1504)在scala.collection.mutable.Resizabl eArray $ class.foreach(ResizableArray.scala:59)位于org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)的scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:814)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:814)在scala.Option.foreach(Option.scala:257)在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)在org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)at org . apache.spark.SparkContext.runJob(SparkContext.scala:2050)org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)at org.apache.spark.sql.execution.SparkPlan.ex ecuteTake(SparkPlan.scala:336)org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)at org.apache.spark.sql.Dataset.org $ apache $ spark $ sql $ Dataset $ $ collectFromPlan(Dataset.scala:2861)atg.apache.spark.sql.Dataset $$ anonfun $ head $ 1.apply(Dataset.scala:2150)at org.apache.spark.sql.Dataset $$ anonfun $ head $ 1 .apply(Dataset.scala:2150)org.apache.spark.sql.Dataset $$ anonfun $ 55.apply(Dataset.scala:2842)at org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution . scala:65)org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)org.apache.spark.sql.Dataset.head(Dataset.scala:2150)atg.apache.spark.sql .Dataset.take(Dataset.scala:2363)在org.apache.spark.sql.Dataset.showString(Dataset.scala:241)在sun.reflect.GeneratedMethodAccessor237.invoke(未知来源)在sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)位于py4j.reflection.MethodInvoker.inv的java.lang.reflect.Method.invoke(Method.java:498) OKE(MethodInvoker.java:244)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132 )at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)引起:org.apache .spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第177行,主进程()文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py”,第172行,正在进行serializer.dump_stream(func(split_index,iterator),outfile)文件“/ usr / lib / spark / python / lib / pyspark.zip / pyspark / worker.py“,第104行,在func = lambda _中,它:map(mapper,it)文件”“,第1行,在文件”/ usr / lib / spark / python中/lib/pyspark.zip/pyspark/worker.py“,第71行,作为回报lambda * a:f(* a)文件”“,第5行,在findClosestPreviousDate中TypeError:'NoneType'obj ect不能在org.apache.apark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:193)中在org.apache.spark.api.python.PythonRunner $$ anon $ 1进行迭代 . (PythonRDD.scala :234)org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)at org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply(BatchEvalPythonExec.scala :144)atg.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply(BatchEvalPythonExec.scala:87)org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $应用$ 23.apply(RDD.scala:797)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ apply $ 23.apply(RDD.scala:797)at org.apache.spark位于org.apache.spark.rdd.RDD.iterator的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)的.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)(RDD.scala:287) )org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:3) 23)org.apache.spark.rdd.RDD.iterator(RDD.scala:287)atg.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)at org.apache.spark.rdd.RDD .guteOrReadCheckpoint(RDD.scala:323)atg.apache.spark.rdd.RDD.iterator(RDD.scala:287)atg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)at org . apache.spark.scheduler.Task.run(Task.scala:108)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:338)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor . java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)... 1更多

2 回答

  • 0

    我尝试了你的udf,但它不断返回0(int) .

    dlt = currdate-result # result and currdate are same
    return dlt.days # days is int type
    

    但是在创建udf时,您已经指定了StringType .

    findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
    

    因此我修改了 findClosestPreviousDate 函数,如有必要请进行更改 .

    >>> in_dates = ['2017-01-26', '2017-02-26', '2017-04-17']
    >>>
    >>> def findClosestPreviousDate(currdate, date_list=in_dates):
    ...     date_format = "%Y-%m-%d"
    ...     currdate = datetime.datetime.strptime(currdate, date_format)
    ...     date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    ...     diff = map(lambda dt: (currdate - dt).days, date_list)
    ...     closestDate = min(filter(lambda days_diff: days_diff <= 0, diff))
    ...     return closestDate if closestDate else 0
    ...
    >>> findClosestPreviousDate('2017-01-06')
    -101
    

    还将udf的返回类型设为 IntegerType . 通过这些修改,代码可以正常运行,但请验证是否有更改正确 . PySpark udfs只能接受单个参数,有一个解决方法,请参考PySpark - Pass list as parameter to UDF

    >>> df.show()
    +----------+
    |      date|
    +----------+
    |2017-01-06|
    |2017-01-08|
    +----------+
    
    >>>
    >>> in_dates = ['2017-01-26', '2017-02-26', '2017-04-17']
    >>> def findClosestPreviousDate(currdate, date_list=in_dates):
    ...     date_format = "%Y-%m-%d"
    ...     currdate = datetime.datetime.strptime(currdate, date_format)
    ...     date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    ...     diff = map(lambda dt: (currdate - dt).days, date_list)
    ...     closestDate = min(filter(lambda days_diff: days_diff <= 0, diff))
    ...     return closestDate if closestDate else 0
    ...
    >>> findClosestPreviousDate('2017-01-06')
    -101
    >>>
    >>> from pyspark.sql.types import IntegerType
    >>> findClosestPreviousDateUDF = udf(findClosestPreviousDate, IntegerType())
    >>> df.withColumn('closest_date', findClosestPreviousDateUDF(df['date'])).show()
    +----------+------------+
    |      date|closest_date|
    +----------+------------+
    |2017-01-06|        -101|
    |2017-01-08|         -99|
    +----------+------------+
    

    希望这可以帮助!

  • 0

    我想出了问题 . 这是我修改过的UDF .

    def findClosestPreviousDate(currdate, date_str):
        date_format = "%Y-%m-%d"
        currdate = datetime.datetime.strptime(currdate, date_format)
        date_list = ''
        result = currdate
        if date_str is None:
            return date_str
        else:
            date_list = date_str.split('|')
        date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
        lowestdiff = 10000
        for dt in date_list:
            if(dt >= currdate):
                continue
            delta = currdate-dt
            diff = delta.days
            if(diff < lowestdiff):
                lowestdiff = diff
                result = dt
        dlt = currdate-result
        return dlt.days
    

    NoneType错误是由于null值作为我知道的参数进入UDF . 我想知道为什么在使用isNotNull()函数时没有过滤掉空值 .

    试过两个

    findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
    grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(size(col("activity_arr")) > 0, findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))
    

    findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
    grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(col("activity_arr").isNotNull(), findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))
    

    但是当我在函数findClosestPreviousDate()中传递上面python函数中的NoneType时,如下所示

    if date_str is None:
        return date_str
    else:
        date_list = date_str.split('|')
    

    有效 .

相关问题