首页 文章

在Spark结构化流媒体中,如何将完整聚合输出到外部源(如REST服务)

提问于
浏览
1

我尝试执行的任务是聚合DataFrame中维度(字段)的值计数,执行一些统计信息,如average,max,min等,然后通过进行API调用将聚合输出到外部系统 . 我使用的是30秒的水印,窗口大小为10秒 . 我将这些尺寸做得很小,以便我更容易测试和调试系统 .

我发现进行API调用的唯一方法是使用 ForeachWriter . 我的问题是 ForeachWriter 在分区级别执行,并且每个分区只生成一个聚合 . 到目前为止,我还没有找到一种方法来获取累积聚合,而不是合并为1,这是一种减慢我的流应用程序的方法 .

我发现,如果我使用基于文件的接收器(如Parquet编写器)到HDFS,则代码会产生真正的聚合 . 它也表现得很好 . 我真正需要的是实现相同的结果,但调用API而不是写入文件系统 .

有谁知道如何做到这一点?

我已经尝试过使用Spark 2.2.2和Spark 2.3并获得相同的行为 .

这是一个简化的代码片段,用于说明我要做的事情:

val valStream = streamingDF
  .select(
    $"event.name".alias("eventName"),
    expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
    $"asset.assetClass").alias("assetClass")
  .where($"eventName" === 'MyEvent')
  .withWatermark("eventTime", "30 seconds")
  .groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
  .agg(count($"eventName").as("eventCount"))
  .select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]
  .writeStream
  .option("checkpointLocation", config.checkpointPath)
  .outputMode(config.outputMode)

val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) {
    valStream.format(config.outputFormat)
    .option("path", config.outputPath)
  }
  else {
    valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
  }).start()

1 回答

  • 1

    我回答了自己的问题 . 我发现按窗口开始时间重新分区就可以了 . 它会对数据进行混洗,以便具有相同组和windowStart时间的所有行都在同一个执行程序上 . 下面的代码为每个组窗口间隔生成一个文件 . 它也表现得很好 . 我没有确切的数字,但它产生的聚合时间少于窗口间隔10秒 .

    val valStream = streamingDF
      .select(
        $"event.name".alias("eventName"),
        expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
        $"asset.assetClass").alias("assetClass")
      .where($"eventName" === 'MyEvent')
      .withWatermark("eventTime", "30 seconds")
      .groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
      .agg(count($"eventName").as("eventCount"))
      .select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]
    
      .repartition($"windowStart")  // <-------- this line produces the desired result
    
      .writeStream
      .option("checkpointLocation", config.checkpointPath)
      .outputMode(config.outputMode)
    
    val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) {
        valStream.format(config.outputFormat)
        .option("path", config.outputPath)
      }
      else {
        valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
      }).start()
    

相关问题