首页 文章

无法找到存储在数据集中的类型的编码器 . 在火花结构流

提问于
浏览
1

我正在尝试火花网站上给出的火花结构化流媒体的例子,但它正在抛出错误

1. Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

2. not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[data])org.apache.spark.sql.Dataset[data]. Unspecified value parameter evidence$2. val ds: Dataset[data] = df.as[data]

这是我的代码

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        spark.sparkContext.setLogLevel("WARN")

    case class data(name: String, id: String)

    val df = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "172.21.0.187:9093")
          .option("subscribe", "test")
          .load()
    println(df.isStreaming)

    val ds: Dataset[data] = df.as[data]
    val value = ds.select("name").where("id > 10")




    value.writeStream
          .outputMode("append")
          .format("console")
          .start()
          .awaitTermination()

  }
}

任何有关如何使这项工作的帮助 . 我想要这样的最终输出我想要这样的输出

+-----+--------+
| name|    id
+-----+--------+
|Jacek|     1
+-----+--------+

2 回答

  • 1

    出错的原因是您正在处理来自Kafka的 Array[Byte] 并且没有匹配 data case类的字段 .

    scala> println(schema.treeString)
    root
     |-- key: binary (nullable = true)
     |-- value: binary (nullable = true)
     |-- topic: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- offset: long (nullable = true)
     |-- timestamp: timestamp (nullable = true)
     |-- timestampType: integer (nullable = true)
    

    将行 df.as[data] 更改为以下内容:

    df.
      select($"value" cast "string").
      map(value => ...parse the value to get name and id here...).
      as[data]
    

    我强烈建议使用 selectfunctions 对象来处理传入的数据 .

  • 0

    该错误是由于数据框中的列数与您的案例类不匹配造成的 .

    数据框中有 [topic, timestamp, value, key, offset, timestampType, partition]

    而您的案例类只有两列

    case class data(name: String, id: String)
    

    您可以将数据框的内容显示为

    val display = df.writeStream.format("console").start()
    

    睡几秒钟然后

    display.stop()
    

    并使用 option("startingOffsets", "earliest") 如上所述here

    然后根据您的数据创建一个案例类 .

    希望这可以帮助!

相关问题