我正在使用Spark 2.2结构化流式传输来读取CSV文件 . 我将查询结果写入控制台是这样的:
val consoleQuery = exceptions
.withWatermark("time", "5 years")
.groupBy(window($"time", "1 hour"), $"id")
.count()
.writeStream
.format("console")
.option("truncate", value = false)
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Complete())
结果看起来很好:
+---------------------------------------------+-------------+-----+
|window |id |count|
+---------------------------------------------+-------------+-----+
|[2017-02-17 09:00:00.0,2017-02-17 10:00:00.0]|EXC0000000001|1 |
|[2017-02-17 09:00:00.0,2017-02-17 10:00:00.0]|EXC0000000002|8 |
|[2017-02-17 08:00:00.0,2017-02-17 09:00:00.0]|EXC2200002 |1 |
+---------------------------------------------+-------------+-----+
但是当它写入Parquet文件时
val parquetQuery = exceptions
.withWatermark("time", "5 years")
.groupBy(window($"time", "1 hour"), $"id")
.count()
.coalesce(1)
.writeStream
.format("parquet")
.option("path", "src/main/resources/parquet")
.trigger(Trigger.ProcessingTime(10.seconds))
.option("checkpointLocation", "src/main/resources/checkpoint")
.outputMode(OutputMode.Append())
和另一份工作一起阅读,
val data = spark.read.parquet("src/main/resources/parquet/")
结果是这样的:
+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+
1 回答
TL;DR
parquetQuery
已启动 not ,因此没有来自流式查询的输出 .查看
parquetQuery
的类型,它是org.apache.spark.sql.streaming.DataStreamWriter,它只是在某个时候应该启动的查询的描述 . 由于它不是,查询从来没有能够做任何写入流的事情 .在
parquetQuery
声明的最后添加start
(在调用链之后或作为调用链的一部分) .