我试图使用下面scala的kafka流是我的Java代码,它完全正常:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
我的scala代码如下:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
scala代码抛出编译错误 . 期望类型不匹配:找不到ForEachAction [> String,> String],Actual((any,any),Unit):找不到值键:值value
有没有人知道如何在scala中使用流API
2 回答
你的语法错了:) .
->
只是用于创建对的运算符,所以表达式有类型((Any,Any),Unit)因为编译器无法推断任何类型信息(并且缺少
key
和value
)如果您使用scala 2.12替换
->
与=>
应该解决问题,但如果您使用旧版本的scala,则必须显式实现java bifunction:您可以使用print方法直接打印kafkastream .
它将打印kafka流 . 您甚至可以通过将参数传递给print函数来打印键或值 .