首页 文章

为什么来自Kafka的读取流失败并且“无法找到存储在数据集中的类型的编码器”?

提问于
浏览
2

我正在尝试将Spark Structured Streaming与Kafka一起使用 .

object StructuredStreaming {

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StructuredStreaming <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

    // Subscribe to 1 topic
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9093")
      .option("subscribe", "sparkss")
      .load()
    lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
    }
}

我从Spark文档中获取了代码,并且遇到了这个构建错误:

无法找到存储在数据集中的类型的编码器 . 导入spark.implicits支持原始类型(Int,String等)和产品类型(case类) . 在将来的版本中将添加对序列化其他类型的支持 . .as [(String,String)]

我在其他SO帖子上读到,这是由于缺乏 import spark.implicits._ . 但它对我没有任何改变 .

UPDATE

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <slf4j.version>1.7.12</slf4j.version>
    <spark.version>2.1.0</spark.version>
    <scala.version>2.10.4</scala.version>
    <scala.binary.version>2.10</scala.binary.version>
</properties>

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

1 回答

  • 0

    好吧,我尝试使用scala 2.11.8

    <scala.version>2.11.8</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    
    </dependencies>
    

    并且具有相应的依赖关系(对于scala 2.11)并且它最终起作用 .

    Warning :您需要在intelliJ上重启项目,我认为在更改版本而不重新启动时存在一些问题,错误仍然存在 .

相关问题