首页 文章

Pyspark - 深度优先搜索数据帧

提问于
浏览
0

我有以下pyspark应用程序,它从子/父进程id的csv生成子/父进程序列 . 考虑到树的问题,我使用从叶节点开始的迭代深度优先搜索(没有子节点的进程)并迭代我的文件来创建这些闭包,其中进程1是进程2的父进程,即进程3的父进程依此类推 .

换句话说,给定如下所示的csv,是否可以使用pyspark数据帧和适当的pyspark-isms来实现深度优先搜索(迭代或递归),以生成所述闭包而无需使用.collect()函数(难以置信的昂贵)?

from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext

class Test():
    def __init__(self):
        self.process_list = []

def main():

    test = Test()
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc) 
    df = sc.textFile("<path to csv>") 
    df = df.map(lambda line: line.split(","))
    header = df.first()
    data = df.filter(lambda row: row != header)
    data = data.toDF(header)

    data.createOrReplaceTempView("flat")
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat 
                           where doc_parent_pid is not null AND 
                           doc_process_pid is not null")
    data = data.select(monotonically_increasing_id().alias("rowId"), "*")

    data.createOrReplaceTempView("data")
    leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
                                 where doc_parent_pid != -1 AND 
                                 doc_process_pid == -1")
    leaf_df = leaf_df.rdd.collect()
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
                             where doc_process_pid != -1")
    data.createOrReplaceTempView("data")

    for row in leaf_df:
        path = []
        rowID = row[0]  
        data = data.filter(data['rowId'] != rowID)
        parentID = row[4]
        path.append(parentID)
        while (True):
            next_df = sqlContext.sql(
                "select doc_process_pid, doc_parent_pid from data where 
                 doc_process_pid == " + str(parentID))

            next_df_rdd = next_df.collect()
            print("parent: ", next_df_rdd[0][1])
            parentID = next_df_rdd[0][1]

            if (int(parentID) != -1):
                path.append(next_df_rdd[0][1])
            else:
                test.process_list.append(copy.deepcopy(path))
                break

        print("final: ", test.process_list)


main()

这是我的csv:

doc_process_pid doc_parent_pid
   1             -1
   2              1
   6             -1
   7              6
   8              7
   9              8
   21            -1
   22            21
   24            -1
   25            24
   26            25
   27            26
   28            27
   29            28
   99             6
   107           99
   108           -1
   109          108
   222          109
   1000           7
   1001        1000
    -1            9
    -1           22
    -1           29
    -1          107
    -1         1001
    -1          222
    -1            2

它表示子/父进程关系 . 如果我们将其视为树,则叶节点由doc_process_id == -1定义,根节点为doc_parent_process == -1的进程 .

上面的代码生成两个数据帧:

叶节点:

+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
|             -1|             9|
|             -1|            22|
|             -1|            29|
|             -1|           107|
|             -1|          1001|
|             -1|           222|
|             -1|             2|
+---------------+--------------+

剩下的子/父进程没有叶子节点:

+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
|              1|            -1|
|              2|             1|
|              6|            -1|
|              7|             6|
|              8|             7|
|              9|             8|
|             21|            -1|
|             22|            21|
|             24|            -1|
|             25|            24|
|             26|            25|
|             27|            26|
|             28|            27|
|             29|            28|
|             99|             6|
|            107|            99|
|            108|            -1|
|            109|           108|
|            222|           109|
|           1000|             7|
+---------------+--------------+

输出将是:

[[1, 2], 
 [6, 99, 107], 
 [6, 99, 7, 1000, 1001], 
 [6, 7, 1000, 8, 9], 
 [21, 22], 
 [24, 25, 26, 27, 28, 29], 
 [108, 109, 222]])

思考?虽然这有点具体,但我想强调执行深度优先搜索以生成以此DataFrame格式表示的序列闭包的一般化问题 .

在此先感谢您的帮助!

1 回答

  • 1

    我不认为pyspark是最好的语言 .

    解决方案是遍历每次都将数据帧与自身连接的树节点级别 .

    让我们创建我们的数据帧,不需要将它分成叶子和其他节点,我们只保留原始数据帧:

    data = spark.createDataFrame(
        sc.parallelize(
            [[1, -1], [2,  1], [6, -1], [7,  6], [8,  7], [9,  8], [21,-1], [22,21], [24,-1], [25,24], [26,25], [27,26], [28,27], 
             [29,28], [99, 6], [107,99], [108,-1], [109,108], [222,109], [1000,7], [1001,1000], [ -1,9], [ -1,22], [ -1,29], 
             [ -1,107], [ -1, 1001], [ -1,222], [ -1,2]]
        ), 
        ["doc_process_pid", "doc_parent_pid"]
    )
    

    我们现在将从这棵树创建两个数据帧,一个将是我们的建筑基础,另一个将是我们的建筑砖块:

    df1 = data.filter("doc_parent_pid = -1").select(data.doc_process_pid.alias("node"))
    df2 = data.select(data.doc_process_pid.alias("son"), data.doc_parent_pid.alias("node")).filter("node != -1")
    

    让我们为结构的步骤 i 定义一个函数:

    def add_node(df, i):
        return df.filter("node != -1").join(df2, "node", "inner").withColumnRenamed("node", "node" + str(i)).withColumnRenamed("son", "node")
    

    让我们定义我们的初始状态:

    from pyspark.sql.types import *
    df = df1
    i = 0
    df_end = spark.createDataFrame(
        sc.emptyRDD(), 
        StructType([StructField("branch", ArrayType(LongType()), True)])
    )
    

    当一个分支完全构造时,我们将它从 df 中取出并放入 df_end

    import pyspark.sql.functions as psf
    while df.count() > 0:
        i = i + 1
        df = add_node(df, i)
        df_end = df.filter("node = -1").drop('node').select(psf.array(*[c for c in reversed(df.columns) if c != "node"]).alias("branch")).unionAll(
            df_end
        )
        df = df.filter("node != -1")
    

    最后,df是空的,我们有

    df_end.show(truncate=False)
        +------------------------+
        |branch                  |
        +------------------------+
        |[24, 25, 26, 27, 28, 29]|
        |[6, 7, 8, 9]            |
        |[6, 7, 1000, 1001]      |
        |[108, 109, 222]         |
        |[6, 99, 107]            |
        |[21, 22]                |
        |[1, 2]                  |
        +------------------------+
    

    该算法的最坏情况是与最大分支长度一样多的连接 .

相关问题