我目前正在使用Spark Structured Streaming制作原始日志数据聚合器 .
Inputstream由一个文本文件目录组成:
// == Input == //
val logsDF = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("input/*")
然后解析日志......
// == Parsing == //
val logsDF2 = ...
......并汇总
// == Aggregation == //
val windowedCounts = logsDF2
.withWatermark("window_start", "15 minutes")
.groupBy(
col("window"),
col("node")
).count()
当我使用“控制台”接收器时,一切正常:结果在控制台中通过浴池批量更新:
// == Output == //
val query = windowedCounts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
现在我想将我的结果保存在一个独特的文件中(json,parquet,csv ..)
// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.option("checkpointLocation", "checkpoint/")
.start("output/")
.awaitTermination()
但是它输出了400个空csv ...我怎样才能得到我在控制台中的结果?
非常感谢你 !