首页 文章

Kafka 流过滤问题

提问于
浏览
1

我正在尝试运行以下示例中的基本应用程序:

https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala

但是我在这一行得到了一个例外:

// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())

Error:(33,25)扩展函数缺少参数类型((x$1)=> x$1.toUpperCase()) textLines.mapValues(_.toUpperCase())

如果我将光标悬停在代码上,我会收到错误:

类型不匹配,预期:ValueMapper [5],实际:(任何)=>任何无法解析符号 toUpperCase

我的 sbt 文件的内容:

name := "untitled1"

version := "0.1"

scalaVersion := "2.11.11"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"

我真的不确定如何处理,因为我对 Scala 很新。我想知道问题是什么以及如何解决它。

1 回答

相关问题