我一直在寻找试图找到问题所在的年龄 .

我有一个pyspark数据帧并按如下方式对其进行分区:

data.registerTempTable('data')
query = """
        SELECT *
        From data
        DISTRIBUTE BY id1, id2, id3
        SORT BY id1, id2, id3, date
    """           
data = self.sql(query, store=False)

然后我想将一些代码应用于分区,

data_list = data.rdd.mapPartitions(lambda x:func_1(form_set(x)))

在内部功能我做

def form_set(partition):
    tracker_index =0
    for row in partition:
        tracker_index+=1
        ##some actions##
        print("id1:{}, id2:{}, id3:{}, ind:{}".format(id1,id2,id3,tracker_index)
    return partition

然后是一个只接受form_set输出的外部函数

def func_1(partition):
    #actions#
    return partition

函数中的动作依赖于正确分区中的数据,即具有相同id1,id2和id3的所有数据应该在同一分区中 . 但是,当我查看打印日志时,它会显示出来

id1:1, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:1
id1:2, id2:2, id3:3, ind:2
id1:1, id2:2, id3:3, ind:1
id1:2, id2:2, id3:3, ind:2

而我已经预料到了

id1:1, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:2
id1:2, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:3
id1:2, id2:2, id3:3, ind:2

我真的迷失了如何解决这个问题 . 我对pyspark相当新,但我一直在寻找很长一段时间,但找不到答案 . 任何帮助将不胜感激 . 谢谢