首页 文章

Spark任务不具有滞后窗口功能可序列化

提问于
浏览
5

我注意到,如果我使用函数调用map(),我在DataFrame上使用Window函数后,Spark会返回“Task not serializable”Exception这是我的代码:

val hc:org.apache.spark.sql.hive.HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f():String = "test"
case class P(name:String,surname:String)
val lag_result:org.apache.spark.sql.Column = lag($"name",1).over(Window.partitionBy($"surname"))
val lista:List[P] = List(P("N1","S1"),P("N2","S2"),P("N2","S2"))
val data_frame:org.apache.spark.sql.DataFrame = hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
//df.withColumn("lag_result", lag_result).map{case x => def f():String = "test";f}.collect // This works

这是堆栈跟踪:

org.apache.spark.SparkException:在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $中无法序列化的任务util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294)atg.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122)at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:324)at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:323) at ...和更多引起:java.io.NotSerializableException:org.apache.spark.sql.Column序列化堆栈: - 对象不可序列化(类:org.apache.spark.sql.Column,值:'lag(name) ,1,null)windowspecdefinition(surname,UnspecifiedFrame)) - 字段(类:$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC,name:lag_result,type:class org.apache . spark.sql.Column)......等等

1 回答

  • 7

    lag 返回 o.a.s.sql.Column ,这是不可序列化的 . 同样的事适用于 WindowSpec . 在交互模式下,这些对象可以作为 map 的闭包的一部分包含在内:

    scala> import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.expressions.Window
    
    scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
    df: org.apache.spark.sql.DataFrame = [x: string, y: int]
    
    scala> val w = Window.partitionBy("x").orderBy("y")
    w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097
    
    scala> val lag_y = lag(col("y"), 1).over(w)
    lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
    
    scala> def f(x: Any) = x.toString
    f: (x: Any)String
    
    scala> df.select(lag_y).map(f _).first
    org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    ...
    Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
    Serialization stack:
        - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)
    

    一个简单的解决方案是将两者标记为瞬态:

    scala> @transient val w = Window.partitionBy("x").orderBy("y")
    w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470
    
    scala> @transient val lag_y = lag(col("y"), 1).over(w)
    lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
    
    scala> df.select(lag_y).map(f _).first
    res1: String = [null]
    

相关问题