我有以下pyspark应用程序,它从子/父进程id的csv生成子/父进程序列 . 考虑到树的问题,我使用从叶节点开始的迭代深度优先搜索(没有子节点的进程)并迭代我的文件来创建这些闭包,其中进程1是进程2的父进程,即进程3的父进程依此类推 .
换句话说,给定如下所示的csv,是否可以使用pyspark数据帧和适当的pyspark-isms来实现深度优先搜索(迭代或递归),以生成所述闭包而无需使用.collect()函数(难以置信的昂贵)?
from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext
class Test():
def __init__(self):
self.process_list = []
def main():
test = Test()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sc.textFile("<path to csv>")
df = df.map(lambda line: line.split(","))
header = df.first()
data = df.filter(lambda row: row != header)
data = data.toDF(header)
data.createOrReplaceTempView("flat")
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat
where doc_parent_pid is not null AND
doc_process_pid is not null")
data = data.select(monotonically_increasing_id().alias("rowId"), "*")
data.createOrReplaceTempView("data")
leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_parent_pid != -1 AND
doc_process_pid == -1")
leaf_df = leaf_df.rdd.collect()
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_process_pid != -1")
data.createOrReplaceTempView("data")
for row in leaf_df:
path = []
rowID = row[0]
data = data.filter(data['rowId'] != rowID)
parentID = row[4]
path.append(parentID)
while (True):
next_df = sqlContext.sql(
"select doc_process_pid, doc_parent_pid from data where
doc_process_pid == " + str(parentID))
next_df_rdd = next_df.collect()
print("parent: ", next_df_rdd[0][1])
parentID = next_df_rdd[0][1]
if (int(parentID) != -1):
path.append(next_df_rdd[0][1])
else:
test.process_list.append(copy.deepcopy(path))
break
print("final: ", test.process_list)
main()
这是我的csv:
doc_process_pid doc_parent_pid
1 -1
2 1
6 -1
7 6
8 7
9 8
21 -1
22 21
24 -1
25 24
26 25
27 26
28 27
29 28
99 6
107 99
108 -1
109 108
222 109
1000 7
1001 1000
-1 9
-1 22
-1 29
-1 107
-1 1001
-1 222
-1 2
它表示子/父进程关系 . 如果我们将其视为树,则叶节点由doc_process_id == -1定义,根节点为doc_parent_process == -1的进程 .
上面的代码生成两个数据帧:
叶节点:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| -1| 9|
| -1| 22|
| -1| 29|
| -1| 107|
| -1| 1001|
| -1| 222|
| -1| 2|
+---------------+--------------+
剩下的子/父进程没有叶子节点:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| 1| -1|
| 2| 1|
| 6| -1|
| 7| 6|
| 8| 7|
| 9| 8|
| 21| -1|
| 22| 21|
| 24| -1|
| 25| 24|
| 26| 25|
| 27| 26|
| 28| 27|
| 29| 28|
| 99| 6|
| 107| 99|
| 108| -1|
| 109| 108|
| 222| 109|
| 1000| 7|
+---------------+--------------+
输出将是:
[[1, 2],
[6, 99, 107],
[6, 99, 7, 1000, 1001],
[6, 7, 1000, 8, 9],
[21, 22],
[24, 25, 26, 27, 28, 29],
[108, 109, 222]])
思考?虽然这有点具体,但我想强调执行深度优先搜索以生成以此DataFrame格式表示的序列闭包的一般化问题 .
在此先感谢您的帮助!
1 回答
我不认为pyspark是最好的语言 .
解决方案是遍历每次都将数据帧与自身连接的树节点级别 .
让我们创建我们的数据帧,不需要将它分成叶子和其他节点,我们只保留原始数据帧:
我们现在将从这棵树创建两个数据帧,一个将是我们的建筑基础,另一个将是我们的建筑砖块:
让我们为结构的步骤
i
定义一个函数:让我们定义我们的初始状态:
当一个分支完全构造时,我们将它从
df
中取出并放入df_end
:最后,df是空的,我们有
该算法的最坏情况是与最大分支长度一样多的连接 .