首页 文章

Kafka与Scala一起流

提问于
浏览
1

我试图使用下面scala的kafka流是我的Java代码,它完全正常:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

我的scala代码如下:

val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

scala代码抛出编译错误 . 期望类型不匹配:找不到ForEachAction [> String,> String],Actual((any,any),Unit):找不到值键:值value

有没有人知道如何在scala中使用流API

2 回答

  • 1

    你的语法错了:) . -> 只是用于创建对的运算符,所以表达式

    (key,value)-> {
      println(key)
    }
    

    有类型((Any,Any),Unit)因为编译器无法推断任何类型信息(并且缺少 keyvalue

    如果您使用scala 2.12替换 ->=> 应该解决问题,但如果您使用旧版本的scala,则必须显式实现java bifunction:

    textLines.foreach(new BiFunction[T1, T2] { ... })
    
  • 5

    您可以使用print方法直接打印kafkastream .

    textlines.print

    它将打印kafka流 . 您甚至可以通过将参数传递给print函数来打印键或值 .

相关问题