首页 文章

如何使用PySpark进行嵌套for-each循环

提问于
浏览
5

想象一下一个大型数据集(> 40GB镶木地板文件),其中包含数千个变量的值观测值作为三元组 (variable, timestamp, value) .

现在想一下您只对500个变量的子集感兴趣的查询 . 并且您想要检索特定时间点(观察窗口或时间范围)的变量的观察值(值 - >时间序列) . 这样有一个开始和结束时间 .

没有分布式计算(Spark),您可以像这样编写代码:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

My question is: 如何使用Spark / PySpark做到这一点?我在考虑:

  • 以某种方式使用变量加入事件并在之后过滤数据帧 .

  • 广播事件数据帧并在过滤变量观测值(df_all)时在map-function中使用它 .

  • 以某种方式使用RDD.cartasian或RDD.mapParitions(注释:镶木地板文件由变量分配保存) .

预期的输出应该是:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

其中数据帧1包含所有变量及其在事件1和数据帧2的时间范围内的观测值,这些值在事件2的时间范围内 .

我希望你有这个主意 .

UPDATE

我尝试编写基于思路#1的解决方案和来自zero323给出的答案的代码 . 工作很顺利,但我想知道如何在最后一步中将其聚合/分组到事件中?我尝试为每个事件添加一个序号,但后来我在最后一步中遇到了错误 . 如果您可以查看和/或完成代码,那将会很酷 . 因此,我上传了示例数据和脚本 . 环境是Spark 1.4(PySpark):

1 回答

  • 1

    一般来说,只有第一种方法对我来说才合理 . 关于记录和分布数量的完全加入策略,但您可以创建顶级数据框:

    ref = sc.parallelize([(var_, incident) 
        for var_ in variables_of_interest:
        for incident in incidents
    ]).toDF(["var_", "incident"])
    

    只是 join

    same_var = col("Variable") == col("var_")
    same_time = col("Time").between(
        col("incident.startTime"),
        col("incident.endTime")
    )
    
    ref.join(df.alias("df"), same_var &  same_time)
    

    或对特定分区执行连接:

    incidents_ = sc.parallelize([
       (incident, ) for incident in incidents
    ]).toDF(["incident"])
    
    for var_ in variables_of_interest:
        df = spark.read.parquet("/some/path/Variable={0}".format(var_))
        df.join(incidents_, same_time)
    

    可选marking one side as small enough to be broadcasted .

相关问题