我正在尝试从Spark流数据源读取数据,按事件时间窗口,然后在窗口数据上运行自定义Python函数(它使用非标准Python库) .
我的数据框看起来像这样:
| Time | Value |
| 2018-01-01 12:23:50.200 | 1234 |
| 2018-01-01 12:23:51.200 | 33 |
| 2018-01-01 12:23:53.200 | 998 |
| ... | ... |
窗口似乎与Spark SQL很好地协同工作,使用类似这样的东西:
windowed_df = df.groupBy(window("Time", "10 seconds"))
...,并且有一个关于windowing by event time in the Spark Structured Streaming docs的部分,所以我认为这应该适用于Spark Structured Streaming .
到现在为止还挺好 .
另外,我已经能够使用Spark Streaming(DStream)来应用我的自定义转换操作,该操作当前在传入流上运行(基本上,它假设数据是正确的窗口块,假设我试图摆脱的) . 代码看起来像这样:
def my_analysis(input_rdd):
# convert RDD to native types (would also be possible from a DataFrame)
# run through various Python libs
# construct new RDD with results - 1 row, multiple values (could construct new DataFrame here instead)
my_dstream\
.map(deserialize_from_string)\
.transform(my_analysis)\
.map(serialize_to_string)\
.foreachRDD(write_to_sink)
我现在基本上想要将两者结合起来,所以做的事情如下:
df\
.groupBy(window("Time", "10 seconds"))\
.transform(my_analysis)\ # how do I do this with pyspark.sql.group.GroupedData?
.writeStream # ...
# OR:
my_dstream\
.map(deserialize_from_string)\
.window_by_event_time("10 seconds")\ # how do I do this with a DStream?
.transform(my_analysis)\
.map(serialize_to_string)\
.foreachRDD(write_to_sink)
知道如何才能完成上述任务吗?
我试过的事情:
- 我可以在windowed_df上运行的函数看起来非常有限,基本上IPython建议我只能在这里进行聚合(
min
/max
/avg
/agg
与pyspark.sql.functions) .agg
似乎最有用,但到目前为止我在该领域找到的最好的是使用collect_list
,类似这样:
windowed_df.agg(collect_list("Value")).sort("window").show(20, False)
......但这意味着我失去了时间戳 .
PySpark不支持
- 自定义聚合函数(UDAF)(SPARK-10915)
我看过的其他事情:
-
Arbitrary Stateful Processing in Apache Spark’s Structured Streaming - mapGroupWithState听起来像它可能做我想要的(以及更多),但在PySpark中还没有 .
-
Spark: How to map Python with Scala or Java User Defined Functions? - 在这种情况下,在Scala / Java中编写UADF不是一个选项(我需要使用特定的Python库)
-
How to define UDAF over event-time windows in PySpark 2.1.0 - 相似但没有答案
-
Introducing Vectorized UDFs for PySpark - 这可能有效,使用"Grouped" UDF的"Ordinary Least Squares Linear Regression"示例看起来很有希望 . 但是,它需要Spark 2.3.0(我可以编译),JIRA ticket表示UADF显然是非目标(我不清楚UDAF和GUDF(?)的确切区别)