想象一下一个大型数据集(> 40GB镶木地板文件),其中包含数千个变量的值观测值作为三元组 (variable, timestamp, value) .
现在想一下您只对500个变量的子集感兴趣的查询 . 并且您想要检索特定时间点(观察窗口或时间范围)的变量的观察值(值 - >时间序列) . 这样有一个开始和结束时间 .
没有分布式计算(Spark),您可以像这样编写代码:
for var_ in variables_of_interest:
for incident in incidents:
var_df = df_all.filter(
(df.Variable == var_)
& (df.Time > incident.startTime)
& (df.Time < incident.endTime))
My question is: 如何使用Spark / PySpark做到这一点?我在考虑:
-
以某种方式使用变量加入事件并在之后过滤数据帧 .
-
广播事件数据帧并在过滤变量观测值(df_all)时在map-function中使用它 .
-
以某种方式使用RDD.cartasian或RDD.mapParitions(注释:镶木地板文件由变量分配保存) .
预期的输出应该是:
incident1 --> dataframe 1
incident2 --> dataframe 2
...
其中数据帧1包含所有变量及其在事件1和数据帧2的时间范围内的观测值,这些值在事件2的时间范围内 .
我希望你有这个主意 .
UPDATE
我尝试编写基于思路#1的解决方案和来自zero323给出的答案的代码 . 工作很顺利,但我想知道如何在最后一步中将其聚合/分组到事件中?我尝试为每个事件添加一个序号,但后来我在最后一步中遇到了错误 . 如果您可以查看和/或完成代码,那将会很酷 . 因此,我上传了示例数据和脚本 . 环境是Spark 1.4(PySpark):
-
变量值观测数据(77MB):parameters_sample.csv(把它放到HDFS)
-
Jupyter笔记本:nested_for_loop_optimized.ipynb
-
Python脚本:nested_for_loop_optimized.py
-
PDF脚本导出:nested_for_loop_optimized.pdf
1 回答
一般来说,只有第一种方法对我来说才合理 . 关于记录和分布数量的完全加入策略,但您可以创建顶级数据框:
只是
join
或对特定分区执行连接:
可选marking one side as small enough to be broadcasted .