首页 文章

具有文件源和文件接收器的Spark结构化流中出错

提问于
浏览
0

我的团队现在正在进入结构化流媒体领域 . 我对结构化流媒体比较陌生 .

我有一个要求

来源 - CSV
接收器 - JSON

环境详情:

集群:Spark 2.2.1
编程语言:Scala
构建工具:Gradle

范围:

我已经实现了这个简单的代码

val schema = StructType(
    Array(StructField("customer_id", StringType),
        StructField("name", StringType),
        StructField("pid", StringType),
        StructField("product_name", StringType)))

val fileData = spark.readStream
    .option("header", "true")
    .schema(schema)
    .csv(args(0))

然后我应用一个简单的聚合作为

// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()

最后,写信给JSON

val query = customerCount
    .writeStream
    .format("json")
    .option("path", "src/main/resources/output/myjson_dir")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .start()

问题:

  • 当我使用 .format("console") 时,这可以正常工作 . 但是当我使用 .format("json") 时会引发异常 -

线程“main”中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames / DataSets上有流式聚合时,不支持追加输出模式;; Aggregate [customer_id#0],[customer_id#0,count(1)AS count#18L] - StreamingRelation DataSource(org.apache.spark.sql.SparkSession @ 4b56b031,csv,List(),Some(StructType(StructField)(customer_id) ,StringType,true),StructField(name,StringType,true),StructField(product_id,StringType,true),StructField(product_name,StringType,true))),List(),None,Map(header - > true,path - > / Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input),None),FileSource [/ Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input],[customer_id#0 ,名称#1,product_id#2,product_name#3,日期#4]在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ analysis $ UnsupportedOperationChecker $$ throwError( UnsupportedOperationChecker.scala:297)

我试过的是 outputMode = "update"outputMode = "complete" 的其他组合 . 但这些也会引发错误 . 为什么会这样?这是预期的行为吗?如何将输出写入JSON接收器?

  • 以上Exception讨论了使用 watermarks . AFAIK,水印与Timestamp字段一起使用,但我的输入数据中没有时间戳或日期字段 . 如果我错了,请告诉我 . 如何在这里添加 watermark 有所作为?

  • 我的下一次尝试是编写自定义ForEachSink . 我指的是post . 但这对我也没有帮助 . 这里的问题是,我得到200个目录,每个目录中都有0字节文件 .

  • 如何在最终输出中选择非分组?在简单的批处理中,我通常通过将聚合DF与原始DF连接并选择所需的行来实现此目的 . 但结构化流媒体似乎不喜欢这种方法 . 这是我的示例代码段

val customerCount = fileData.groupBy("customer_id").count()
val finalDF = fileData.join(customerCount, Seq("customer_id"))
    .select("customer_id", "count", "product_name" )

如果我错过任何一个detials,请告诉我 .

1 回答

  • 0

    阅读官方的Spark Structured Streaming文档related to watermarks .

    基本上,当你聚合时,你必须设置 outputMode = "complete" ,因为在没有保留之前完成的处理(例如字数)的情况下附加新数据是没有意义的 .

    因此,您必须使用水印或窗口函数指定程序何时必须启动新聚合,以及何时数据太晚 .

    如果您没有带时间戳的列,则可以使用 now() 函数创建一个,这将是处理时间 .

    如果有任何不清楚或有疑问,请评论,我会更新我的答案 .

相关问题