我正在尝试运行以下示例中的基本应用程序:
但是我在这一行得到了一个例外:
// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())
Error:(33,25)扩展函数缺少参数类型((x$1)=> x$1.toUpperCase()) textLines.mapValues(_.toUpperCase())
如果我将光标悬停在代码上,我会收到错误:
类型不匹配,预期:ValueMapper [5],实际:(任何)=>任何无法解析符号 toUpperCase
我的 sbt 文件的内容:
name := "untitled1"
version := "0.1"
scalaVersion := "2.11.11"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"
我真的不确定如何处理,因为我对 Scala 很新。我想知道问题是什么以及如何解决它。
1 回答
来自http://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type-t
要解决此问题,您需要在 Scala 应用程序中显式声明类型,以便编译代码。例如,您可能需要断开将多个 DSL 操作链接到多个语句的单个语句,其中每个语句显式声明相应的返回类型。 StreamToTableJoinScalaIntegrationTest 演示了如何显式声明返回变量的类型。
****更新
Kafka 2.0(将于 6 月发布)包含一个适当的 Scala API,可以避免这些问题。比较https://cwiki.apache.org/confluence/display/KAFKA/KIP-270 - Kafka Streams 的 Scala Wrapper 库