在我的Spark Structured Streaming代码中,我从Kafka接收用户操作,并且需要在Parquet(追加模式)中存储per_user_and_15_min_window的操作计数 .
事件时间戳与现实不符,它们可能属于过去,并且可能无序到达某一点 .
数据流是这样的:我可能会收到2017年2月的数据,之后可能是2016年11月的数据,但是一旦我收到了用户ID和期间(15分钟)的数据,我希望水印功能关闭聚合(并将其附加到Parquet)如果在X分钟超时(或通过其他类型的触发器)内没有收到该对userID_period的更多数据 .
码:
def main(args: Array[String]) {
//Create Spark Session
val spark = SparkSession.builder
.master("local[2]")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ipaddress_port)
.option("subscribe", "calls-topic")
.load()
.select(from_json(col("value").cast("string"), schema).as("user_record")).select("user_record.*")
.appName("LearningStructuredStreaming")
.getOrCreate()
这是聚合查询
//Write aggregation query
val aggregationQuery = df.withColumn("MyDateTime", from_unixtime((col("attributes.START_TIME")/1000), "yyyy-MM-dd HH:mm").cast(TimestampType))
.withWatermark("MyDateTime", "15 minutes")
.groupBy(window(col("MyDateTime"), "15 minutes"), col("attributes.USER_ID"))
.agg(count("attributes.OPERATION_ID").as("#Operations"))
.writeStream
.format("parquet")
.option("path", "C:/output_parquet")
.option("checkpointLocation", "C:/checkpoint")
.outputMode("append")
.start()
上面代码的输出如下所示(从Parquet输出文件夹中读取):
+------------------------------------------+------------+------------+
|window |USER_ID |#Operations |
+------------------------------------------+------------+------------+
|[2017-07-27 17:00:00.0,2017-07-27 17:15.. |User1 |321 |
|[2017-07-27 17:00:00.0,2017-07-27 17:15.. |User2 |571 |
|[2017-07-27 17:00:00.0,2017-07-27 17:15.. |User3 |272 |
|[2017-07-27 17:15:00.0,2017-07-27 17:30.. |User1 |351 |
|[2017-07-27 17:15:00.0,2017-07-27 17:30.. |User2 |491 |
|[2017-07-27 17:15:00.0,2017-07-27 17:30.. |User3 |277 |
1 回答
我取得了一些进展,但仍然没有 .
这里我添加一个新列引用当前时间,这是我将用于水印的列 .
输出看起来像这样:
这个解决方案允许我处理过去的数据并将其附加到Parquet,无论数据到达的顺序如何 .
但是,我仍然在努力将数据强制附加到Parquet后立即被Spark处理 . 由于水印,保持状态并且在处理更多批次之前不会输出数据,这是不稳定的,因为这引入了不必要的延迟 .
你有更好的方法吗?