CODE HERE...   
 def getMeasure = udf((next: String) =>{
          if(next.equals("[ArStart]")){"Reboot"}
          else if(next.equals("[MONITOR]")){"Reload"}
          else{"Others"}
        })
    val systemWindow = Window.partitionBy($"company", $"location", $"systemname").orderBy($"timestamp",$"rn")

val tmp_dev_msg_1 = ds_state_2.where($"replaced_device_message".isNotNull)
val tmp_dev_msg_2 = tmp_dev_msg_1
        .withColumn("measure", getMeasure(lead($"replaced_device_message", 1).over(systemWindow)))
val test = tmp_dev_msg_2
        .select($"timestamp", $"rn", $"measure")
val ds_measure_1 = ds_state_2.join(test, Seq("timestamp" , "rn"), "leftouter")

MORE CODE HERE

没有join语句,此代码完美运行 . 打印测试数据集仅显示replacement_device_message不为null的条目 .

当我使用join语句执行时,我得到函数getMeasure的nullpointer异常 . 但这不应该是可能的,因为我正在查看 tmp_dev_msg1($"replaced_device_message") ,它不能包含任何空值 .

编辑:错误的行为显示时间戳不唯一的条目 . 但是根据我的理解,这应该不是问题,因为我还有一个row_number - > rn是唯一的

错误消息:org.apache.spark.SparkException:无法在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext执行用户定义的函数(anonfun $ getMeasure $ 1 $ 1:(string)=> string) (未知来源)org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec . scala:395)在org.apache.spark的org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)的scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408) .scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)atg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)at org.apache.spark.scheduler.Task.run(Task.scala:108) )在java.util的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)的org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:335)中 . concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:748)引起:Stability $$ anonfun $ getMeasure $ 1 $ 1.apply的java.lang.NullPointerException(稳定性.scala:165)在Stability $$ anonfun $ getMeasure $ 1 $ 1.apply(Stability.scala:164)

任何想法?