首页 文章

定义UDAF导致Spark-sql错误

提问于
浏览
0

使用Zeppelin Notebook在AWS EMR上使用Spark版本1.6.0

我用以下代码定义了UDAF:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import java.text.SimpleDateFormat
import java.util.Date

class AggregateTS extends UserDefinedAggregateFunction{
    def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)

    def bufferSchema: StructType = StructType(StructField("intermediate", StringType)::Nil)

    def dataType: DataType = StringType

    def deterministic: Boolean = true

    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = "Init"
    }

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (buffer.getAs[String](0) == "Init"){
            buffer(0) = input.getAs[String](0)
        }
        else{
            // add two string
            buffer(0) = average_ts(input.getAs[String](0), buffer.getAs[String](0))
        }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2:Row):Unit = {
        buffer1(0) = average_ts(buffer1.getAs[String](0), buffer2.getAs[String](0))
    }

    def evaluate(buffer: Row): Any = {
        buffer.getAs[String](0)
    }
}

我从中得到了一个编译错误:

error: not found: type DataType
       def dataType: DataType = StringType

这是什么意思 ?

1 回答

  • 0

    由我自己解决 . 这似乎是一些导入碰撞错误 . 我将导入句子更改为显式

    import org.apache.spark.sql.types.{DataType}
    

    它适用于那时

相关问题