我一直在寻找试图找到问题所在的年龄 .
我有一个pyspark数据帧并按如下方式对其进行分区:
data.registerTempTable('data')
query = """
SELECT *
From data
DISTRIBUTE BY id1, id2, id3
SORT BY id1, id2, id3, date
"""
data = self.sql(query, store=False)
然后我想将一些代码应用于分区,
data_list = data.rdd.mapPartitions(lambda x:func_1(form_set(x)))
在内部功能我做
def form_set(partition):
tracker_index =0
for row in partition:
tracker_index+=1
##some actions##
print("id1:{}, id2:{}, id3:{}, ind:{}".format(id1,id2,id3,tracker_index)
return partition
然后是一个只接受form_set输出的外部函数
def func_1(partition):
#actions#
return partition
函数中的动作依赖于正确分区中的数据,即具有相同id1,id2和id3的所有数据应该在同一分区中 . 但是,当我查看打印日志时,它会显示出来
id1:1, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:1
id1:2, id2:2, id3:3, ind:2
id1:1, id2:2, id3:3, ind:1
id1:2, id2:2, id3:3, ind:2
而我已经预料到了
id1:1, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:2
id1:2, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:3
id1:2, id2:2, id3:3, ind:2
我真的迷失了如何解决这个问题 . 我对pyspark相当新,但我一直在寻找很长一段时间,但找不到答案 . 任何帮助将不胜感激 . 谢谢