首页 文章

Spark结构化流媒体Kafka Microbatch计数

提问于
浏览
1

我使用Spark结构化流来读取Kafka主题的记录;我打算在Spark readstream 中计算每个'Micro batch'中收到的记录数

这是一个片段:

val kafka_df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "test-count")
  .load()

我从文档中了解到,当启动 streamingQuery (下一步)时,kafka_df会被懒惰地评估,并且在评估时,它会持有一个微批 . 所以,我想在主题上做一个 groupBy ,然后 count 应该工作 .

像这样:

val counter = kafka_df
             .groupBy("topic")
             .count()

现在要评估所有这些,我们需要一个streaminQuery,比方说,一个控制台接收器查询在控制台上打印它 . 这就是我看到问题的地方 . aggregate DataFrame上的streamingQuery,例如 kafka_df 仅适用于 outputMode complete/update 而不适用于 append .

这实际上意味着,streamingQuery报告的计数是累积的 .

像这样:

val counter_json = counter.toJSON   //to jsonify 
 val count_query = counter_json
                   .writeStream.outputMode("update")
                   .format("console")
                   .start()          // kicks of lazy evaluation
                   .awaitTermination()

在受控设置中,其中:
实际发布的记录:1500
实际收到的微批次:3
aActual收到的记录:1500

每个微量补丁的计数应该是 500 ,所以我希望(希望)查询打印到控制台:

主题:测试计数:500个主题:测试计数:500个主题:测试计数:500

但事实并非如此 . 它实际上打印:

主题:测试计数:500主题:测试计数:1000主题:测试计数:1500

据我所知是因为'outputMode'完成/更新(累计)

我的问题:是否有可能准确地获得每个微批次的计数是Spark-Kafka结构化流媒体?

从文档中,我发现了水印方法(支持追加):

val windowedCounts = kafka_df
                    .withWatermark("timestamp", "10 seconds")
                    .groupBy(window($"timestamp", "10 seconds", "10       seconds"), $"topic")
                    .count()

 val console_query = windowedCounts
                    .writeStream
                    .outputMode("append")
                    .format("console")
                    .start()
                    .awaitTermination()

但是这个 console_query 的结果是不准确的,并且显得有点偏离 .

TL; DR - 任何关于准确计算Spark-Kafka微批次记录的想法都将受到赞赏 .

1 回答

  • 1

    如果您只想使用Kafka在结构化流应用程序中使用每个触发器处理特定数量的记录,请使用选项 maxOffsetsPerTrigger

    val kafka_df = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "test-count")
      .option("maxOffsetsPerTrigger", 500)
      .load()
    

相关问题