首页 文章

UnaryTransformer实例抛出ClassCastException

提问于
浏览
0

我需要创建我自己的UnaryTransformer实例,该实例接受Array [String]类型的Dataframe列,并且还应输出相同的类型 . 在尝试这样做时,我在Spark版本2.1.0上遇到了ClassCastException . 我已经整理了一个展示我案例的样本测试 .

import org.apache.spark.SparkConf
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

class MyTransformer(override val uid:String) extends UnaryTransformer[Array[String],Array[String],MyTransformer] {
  override protected def createTransformFunc: (Array[String]) => Array[String] = {
    param1 =>  {
        param1.foreach(println(_))
      param1
    }
  }

  override protected def outputDataType: DataType = ArrayType(StringType)

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == ArrayType(StringType), s"Data type mismatch between Array[String] and provided type $inputType.")
  }

  def this() = this( Identifiable.randomUID("tester") )
}


object Tester {



  def main(args: Array[String]): Unit = {

     val config =  new SparkConf().setAppName("Tester")

     implicit val sparkSession = SparkSession.builder().config(config).getOrCreate()
     import sparkSession.implicits._

     val dataframe = Seq(Array("Firstly" , "F1"),Array("Driving" , "S1" ),Array("Ran" , "T3" ),Array("Fourth" ,"F4"), Array("Running" , "F5")
       ,Array("Gone" , "S6")).toDF("input")



    val transformer = new MyTransformer().setInputCol("input").setOutputCol("output")

    val transformed = transformer.transform(dataframe)

    transformed.select("output").show()

    println("Complete....")

    sparkSession.close()


  }

}

附加堆栈跟踪以供参考

线程“main”中的异常org.apache.spark.SparkException:无法在org.apache.spark.sql.catalyst.expressions.ScalaUDF中执行用户定义的函数($ anonfun $ createTransformFunc $ 1:(array)=> array) . eval(ScalaUDF.scala:1072)org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:144)at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection . scala:48)org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)at scala .collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)at scala.collection.immutable.List.foreach(List.scala:392)at scala.collection.TraversableLike $ class.map(TraversableLike.scala :234)在scala.collection.immutable.List.map(List.scala:296)at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $$ anonfun $ apply $ 21.applyOrElse(Optimizer.scala:1078)at at org.apache.spark.sql .catalyst.optimizer.ConvertToLocalRelation $$ anonfun $ apply $ 21.applyOrElse(Optimizer.scala:1073)org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3.apply(TreeNode.scala:288)at org .apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3.apply(TreeNode.scala:288)at org.apache.spark.sql.catalyst.trees.CurrentOrigin $ .withOrigin(TreeNode.scala:70)at at Org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply(TreeNode.scala:293) )org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply(TreeNode.scala:293)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply (TreeNode.scala:331)org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala) :329)在org.apache.spark.sql.catalys上的org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) t.trees.TreeNode $$ anonfun $ transformDown $ 1.apply(TreeNode.scala:293)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply(TreeNode.scala:293)at at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode.scala:331)at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)at at Org.apache中的org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) . 位于org.apache.spark.sql的org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $ .apply(Optimizer.scala:1073)中的spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277) .catalyst.optimizer.ConvertToLocalRelation $ .apply(Optimizer.scala:1072)at org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply(RuleExecutor.scala:85 )org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply(RuleExecutor.s) cala:82)scala.collection.IndexedSeqOptimized $ class.foldl(IndexedSeqOptimized.scala:57)at scala.collection.IndexedSeqOptimized $ class.foldLeft(IndexedSeqOptimized.scala:66)at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray .scala:35)org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1.apply(RuleExecutor.scala:82)at org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $执行$ 1.apply(RuleExecutor.scala:74),位于org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala)的scala.collection.immutable.List.foreach(List.scala:392) :74)org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute(QueryExecution.scala:73)org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)at org . 位于org.apache.spark.sql的org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)的apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:79) . execution.QueryExec ution.executedPlan $ lzycompute(QueryExecution.scala:84)atOrg.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)位于org.apache.spark.sql.Dataset的org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791) . 位于org.apache的org.apache.spark.sql.Dataset.take(Dataset.scala:2327)org.apache.spark.sql.Dataset.showString(Dataset.scala:248)的head(Dataset.scala:2112) .spark.sql.Dataset.show(Dataset.scala:636)org.apache.spark.sql.Dataset.show(Dataset.scala:595)atg.apache.spark.sql.Dataset.show(Dataset.scala) :604)在Tester $ .main(Tester.scala:45)at Tester.main(Tester.scala)引起:java.lang.ClassCastException:scala.collection.mutable.WrappedArray $ ofRef无法强制转换为[Ljava.lang . 串;在org.apache.spark的orTra.apache上获取org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2.apply(ScalaUDF.scala:89)的MyTransformer $$ anonfun $ createTransformFunc $ 1.apply(Tester.scala:9) .sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2.apply(ScalaUDF.scala:88)at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1069)... 53更多

1 回答

  • 1

    ArrayType 表示为 Seq 而不是 Array

    override protected def createTransformFunc: (Seq[String]) => Seq[String] = {
      param1 =>  {
          param1.foreach(println(_))
        param1
      }
    }
    

相关问题