首页 文章

如何从流式查询中编写镶木地板文件?

提问于
浏览
1

我正在使用Spark 2.2结构化流式传输来读取CSV文件 . 我将查询结果写入控制台是这样的:

val consoleQuery = exceptions
  .withWatermark("time", "5 years") 
  .groupBy(window($"time", "1 hour"), $"id")
  .count()
  .writeStream
  .format("console")
  .option("truncate", value = false)
  .trigger(Trigger.ProcessingTime(10.seconds))
  .outputMode(OutputMode.Complete())

结果看起来很好:

+---------------------------------------------+-------------+-----+
|window                                       |id           |count|
+---------------------------------------------+-------------+-----+
|[2017-02-17 09:00:00.0,2017-02-17 10:00:00.0]|EXC0000000001|1    |
|[2017-02-17 09:00:00.0,2017-02-17 10:00:00.0]|EXC0000000002|8    |
|[2017-02-17 08:00:00.0,2017-02-17 09:00:00.0]|EXC2200002   |1    |
+---------------------------------------------+-------------+-----+

但是当它写入Parquet文件时

val parquetQuery = exceptions
  .withWatermark("time", "5 years")
  .groupBy(window($"time", "1 hour"), $"id")
  .count()
  .coalesce(1)
  .writeStream
  .format("parquet")
  .option("path", "src/main/resources/parquet")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .option("checkpointLocation", "src/main/resources/checkpoint")
  .outputMode(OutputMode.Append())

和另一份工作一起阅读,

val data = spark.read.parquet("src/main/resources/parquet/")

结果是这样的:

+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+

1 回答

  • 0

    TL;DR parquetQuery 已启动 not ,因此没有来自流式查询的输出 .

    查看 parquetQuery 的类型,它是org.apache.spark.sql.streaming.DataStreamWriter,它只是在某个时候应该启动的查询的描述 . 由于它不是,查询从来没有能够做任何写入流的事情 .

    parquetQuery 声明的最后添加 start (在调用链之后或作为调用链的一部分) .

    val parquetQuery = exceptions
      .withWatermark("time", "5 years")
      .groupBy(window($"time", "1 hour"), $"id")
      .count()
      .coalesce(1)
      .writeStream
      .format("parquet")
      .option("path", "src/main/resources/parquet")
      .trigger(Trigger.ProcessingTime(10.seconds))
      .option("checkpointLocation", "src/main/resources/checkpoint")
      .outputMode(OutputMode.Append())
      .start // <-- that's what you miss
    

相关问题