我目前正在使用Spark Structured Streaming制作原始日志数据聚合器 .

Inputstream由一个文本文件目录组成:

// == Input == //

val logsDF = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("input/*")

然后解析日志......

// == Parsing == //

val logsDF2 = ...

......并汇总

// == Aggregation == //

val windowedCounts = logsDF2
  .withWatermark("window_start", "15 minutes")
  .groupBy(
    col("window"),
    col("node")
  ).count()

当我使用“控制台”接收器时,一切正常:结果在控制台中通过浴池批量更新:

// == Output == //

val query = windowedCounts.writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()

现在我想将我的结果保存在一个独特的文件中(json,parquet,csv ..)

// == Output == //

val query = windowedCounts.writeStream
  .format("csv")
  .option("checkpointLocation", "checkpoint/")
  .start("output/")
  .awaitTermination()

但是它输出了400个空csv ...我怎样才能得到我在控制台中的结果?

非常感谢你 !