我正在尝试火花网站上给出的火花结构化流媒体的例子,但它正在抛出错误
1. Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
2. not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[data])org.apache.spark.sql.Dataset[data]. Unspecified value parameter evidence$2. val ds: Dataset[data] = df.as[data]
这是我的代码
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
case class data(name: String, id: String)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.load()
println(df.isStreaming)
val ds: Dataset[data] = df.as[data]
val value = ds.select("name").where("id > 10")
value.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
任何有关如何使这项工作的帮助 . 我想要这样的最终输出我想要这样的输出
+-----+--------+
| name| id
+-----+--------+
|Jacek| 1
+-----+--------+
2 回答
出错的原因是您正在处理来自Kafka的
Array[Byte]
并且没有匹配data
case类的字段 .将行
df.as[data]
更改为以下内容:我强烈建议使用
select
和functions
对象来处理传入的数据 .该错误是由于数据框中的列数与您的案例类不匹配造成的 .
数据框中有
[topic, timestamp, value, key, offset, timestampType, partition]
列而您的案例类只有两列
您可以将数据框的内容显示为
睡几秒钟然后
并使用
option("startingOffsets", "earliest")
如上所述here然后根据您的数据创建一个案例类 .
希望这可以帮助!