我的团队现在正在进入结构化流媒体领域 . 我对结构化流媒体比较陌生 .
我有一个要求
来源 - CSV
接收器 - JSON
环境详情:
集群:Spark 2.2.1
编程语言:Scala
构建工具:Gradle
范围:
我已经实现了这个简单的代码
val schema = StructType(
Array(StructField("customer_id", StringType),
StructField("name", StringType),
StructField("pid", StringType),
StructField("product_name", StringType)))
val fileData = spark.readStream
.option("header", "true")
.schema(schema)
.csv(args(0))
然后我应用一个简单的聚合作为
// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()
最后,写信给JSON
val query = customerCount
.writeStream
.format("json")
.option("path", "src/main/resources/output/myjson_dir")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
问题:
- 当我使用
.format("console")
时,这可以正常工作 . 但是当我使用.format("json")
时会引发异常 -
线程“main”中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames / DataSets上有流式聚合时,不支持追加输出模式;; Aggregate [customer_id#0],[customer_id#0,count(1)AS count#18L] - StreamingRelation DataSource(org.apache.spark.sql.SparkSession @ 4b56b031,csv,List(),Some(StructType(StructField)(customer_id) ,StringType,true),StructField(name,StringType,true),StructField(product_id,StringType,true),StructField(product_name,StringType,true))),List(),None,Map(header - > true,path - > / Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input),None),FileSource [/ Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input],[customer_id#0 ,名称#1,product_id#2,product_name#3,日期#4]在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ analysis $ UnsupportedOperationChecker $$ throwError( UnsupportedOperationChecker.scala:297)
我试过的是 outputMode = "update"
和 outputMode = "complete"
的其他组合 . 但这些也会引发错误 . 为什么会这样?这是预期的行为吗?如何将输出写入JSON接收器?
-
以上Exception讨论了使用 watermarks . AFAIK,水印与Timestamp字段一起使用,但我的输入数据中没有时间戳或日期字段 . 如果我错了,请告诉我 . 如何在这里添加 watermark 有所作为?
-
我的下一次尝试是编写自定义ForEachSink . 我指的是post . 但这对我也没有帮助 . 这里的问题是,我得到200个目录,每个目录中都有0字节文件 .
-
如何在最终输出中选择非分组?在简单的批处理中,我通常通过将聚合DF与原始DF连接并选择所需的行来实现此目的 . 但结构化流媒体似乎不喜欢这种方法 . 这是我的示例代码段
val customerCount = fileData.groupBy("customer_id").count()
val finalDF = fileData.join(customerCount, Seq("customer_id"))
.select("customer_id", "count", "product_name" )
如果我错过任何一个detials,请告诉我 .
1 回答
阅读官方的Spark Structured Streaming文档related to watermarks .
基本上,当你聚合时,你必须设置
outputMode = "complete"
,因为在没有保留之前完成的处理(例如字数)的情况下附加新数据是没有意义的 .因此,您必须使用水印或窗口函数指定程序何时必须启动新聚合,以及何时数据太晚 .
如果您没有带时间戳的列,则可以使用
now()
函数创建一个,这将是处理时间 .如果有任何不清楚或有疑问,请评论,我会更新我的答案 .