我正在使用结构化流(Spark 2.0.2)来消费kafka消息 . 使用scalapb,protobuf中的消息 . 我收到以下错误 . 请帮忙..
线程“main”中的异常scala.ScalaReflectionException:不是scala.reflect.api.Symbols上的术语$ SymbolApi $ class.asTerm(Symbols.scala:199)at scala.reflect.internal.Symbols $ SymbolContextApiImpl.asTerm(Symbols .scala:84)org.apache.spark.sql.catalyst.ScalaReflection $ class.constructParams(ScalaReflection.scala:811)at org.apache.spark.sql.catalyst.ScalaReflection $ .constructParams(ScalaReflection.scala:39) org.apache.spark.sql.catalyst.ScalaReflection $ class.getConstructorParameters(ScalaReflection.scala:800)org.apache中的org.apache.spark.sql.catalyst.ScalaReflection $ .getConstructorParameters(ScalaReflection.scala:39) . spark.sql.catalyst.ScalaReflection $ .org $ apache $ spark $ sql $ catalyst $ scalaReflection $$ serializerFor(ScalaReflection.scala:582)at org.apache.spark.sql.catalyst.ScalaReflection $ .org $ apache $ spark $ sql $ catalyst $ ScalaReflection $$ serializerFor(ScalaReflection.scala:460)org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 9.apply(ScalaReflection.scala:592)at org . apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 9.apply(ScalaReflection.scala:583)at scala.collection.TraversableLike $$ anonfun $ flatMap $ 1.apply(TraversableLike.scala:252)at scala.collection.TraversableLike $ $ anonfun $ flatMap $ 1.apply(TraversableLike.scala:252)scala.collection.immutable.List.foreach(List.scala:381)at scala.collection.TraversableLike $ class.flatMap(TraversableLike.scala:252)at scala .collection.immutable.List.flatMap(List.scala:344)at org.apache.spark.sql.catalyst.ScalaReflection $ .org $ apache $ spark $ sql $ catalyst $ ScalaReflection $$ serializerFor(ScalaReflection.scala:583)在org.apache的org.apache.spark.sql.catalyst.ScalaReflection $ .serializerFor(ScalaReflection.scala:425)org.apache.spark.sql.catalyst.encoders.ExpressionEncoder $ .apply(ExpressionEncoder.scala:61)at org.apache位于PersonConsumer的PersonConsumer $ .main(PersonConsumer.scala:33)的org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)中的.spark.sql.Encoders $ .product(Encoders.scala:274) . 主(PersonC onsumer.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang .reflect.Method.invoke(Method.java:498)at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
以下是我的代码......
object PersonConsumer {
import org.apache.spark.rdd.RDD
import com.trueaccord.scalapb.spark._
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
def parseLine(s: String): Person =
Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
val ds4 = spark.sqlContext.sql("select name from persons")
val query = ds4.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
2 回答
val ds3
的行应为:在将RDD保存为临时表之前,需要将RDD转换为数据帧 .
在Person类中,gender是一个枚举,这就是导致此问题的原因 . 删除此字段后,它可以正常工作 . 以下是我从DataBricks的Shixiong(Ryan)得到的答案 .
问题是"optional Gender gender = 3;" . 生成的类"Gender"是一个特征,Spark无法知道如何创建特征,因此不支持它 . 您可以定义SQL Encoder支持的类,并将此生成的类转换为
parseLine
中的新类 .