我使用Spark结构化流来读取Kafka主题的记录;我打算在Spark readstream
中计算每个'Micro batch'中收到的记录数
这是一个片段:
val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.load()
我从文档中了解到,当启动 streamingQuery
(下一步)时,kafka_df会被懒惰地评估,并且在评估时,它会持有一个微批 . 所以,我想在主题上做一个 groupBy
,然后 count
应该工作 .
像这样:
val counter = kafka_df
.groupBy("topic")
.count()
现在要评估所有这些,我们需要一个streaminQuery,比方说,一个控制台接收器查询在控制台上打印它 . 这就是我看到问题的地方 . aggregate
DataFrame上的streamingQuery,例如 kafka_df
仅适用于 outputMode
complete/update 而不适用于 append .
这实际上意味着,streamingQuery报告的计数是累积的 .
像这样:
val counter_json = counter.toJSON //to jsonify
val count_query = counter_json
.writeStream.outputMode("update")
.format("console")
.start() // kicks of lazy evaluation
.awaitTermination()
在受控设置中,其中:
实际发布的记录:1500
实际收到的微批次:3
aActual收到的记录:1500
每个微量补丁的计数应该是 500 ,所以我希望(希望)查询打印到控制台:
主题:测试计数:500个主题:测试计数:500个主题:测试计数:500
但事实并非如此 . 它实际上打印:
主题:测试计数:500主题:测试计数:1000主题:测试计数:1500
据我所知是因为'outputMode'完成/更新(累计)
我的问题:是否有可能准确地获得每个微批次的计数是Spark-Kafka结构化流媒体?
从文档中,我发现了水印方法(支持追加):
val windowedCounts = kafka_df
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"topic")
.count()
val console_query = windowedCounts
.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
但是这个 console_query
的结果是不准确的,并且显得有点偏离 .
TL; DR - 任何关于准确计算Spark-Kafka微批次记录的想法都将受到赞赏 .
1 回答
如果您只想使用Kafka在结构化流应用程序中使用每个触发器处理特定数量的记录,请使用选项
maxOffsetsPerTrigger