我正在尝试从Spark流数据源读取数据,按事件时间窗口,然后在窗口数据上运行自定义Python函数(它使用非标准Python库) .

我的数据框看起来像这样:

| Time                    | Value |
| 2018-01-01 12:23:50.200 | 1234  |
| 2018-01-01 12:23:51.200 |   33  |
| 2018-01-01 12:23:53.200 |  998  |
|           ...           |  ...  |

窗口似乎与Spark SQL很好地协同工作,使用类似这样的东西:

windowed_df = df.groupBy(window("Time", "10 seconds"))

...,并且有一个关于windowing by event time in the Spark Structured Streaming docs的部分,所以我认为这应该适用于Spark Structured Streaming .

到现在为止还挺好 .

另外,我已经能够使用Spark Streaming(DStream)来应用我的自定义转换操作,该操作当前在传入流上运行(基本上,它假设数据是正确的窗口块,假设我试图摆脱的) . 代码看起来像这样:

def my_analysis(input_rdd):
    # convert RDD to native types (would also be possible from a DataFrame)
    # run through various Python libs
    # construct new RDD with results - 1 row, multiple values (could construct new DataFrame here instead)

my_dstream\
    .map(deserialize_from_string)\
    .transform(my_analysis)\
    .map(serialize_to_string)\
    .foreachRDD(write_to_sink)

我现在基本上想要将两者结合起来,所以做的事情如下:

df\
    .groupBy(window("Time", "10 seconds"))\
    .transform(my_analysis)\  # how do I do this with pyspark.sql.group.GroupedData?
    .writeStream  # ...

# OR:

my_dstream\
    .map(deserialize_from_string)\
    .window_by_event_time("10 seconds")\  # how do I do this with a DStream?
    .transform(my_analysis)\
    .map(serialize_to_string)\
    .foreachRDD(write_to_sink)

知道如何才能完成上述任务吗?

我试过的事情:

  • 我可以在windowed_df上运行的函数看起来非常有限,基本上IPython建议我只能在这里进行聚合( min / max / avg / aggpyspark.sql.functions) . agg 似乎最有用,但到目前为止我在该领域找到的最好的是使用 collect_list ,类似这样:
windowed_df.agg(collect_list("Value")).sort("window").show(20, False)

......但这意味着我失去了时间戳 .

PySpark不支持

我看过的其他事情: