在Spark程序中为数十亿条记录(> 25亿)实现ROW_NUMBER(序列生成器)的最佳方法是什么?

示例代码:

select patient_id, 
department_id, 
row_number() over (partition by department_id order by dept_id asc) as Pat_serial_Nbr
from T_patient;

Spark程序中的Row_number()运行超过4小时,并且失败达到15亿条记录 .

我已经尝试了RDD方法zipWithIndex()15亿条记录(执行需要40分钟),它正在返回预期的结果 .

public RDD<scala.Tuple2<T,Object>> zipWithIndex()

用它的元素索引来拉开这个RDD . 排序首先基于分区索引,然后是每个分区内的项目顺序 . 因此,第一个分区中的第一个项目获得索引0,最后一个分区中的最后一个项目获得最大索引 . 这类似于Scala的zipWithIndex,但它使用Long而不是Int作为索引类型 . 当此RDD包含多个分区时,此方法需要触发spark作业 . 请注意,某些RDD(例如groupBy()返回的RDD)不保证分区中元素的顺序 . 因此,不保证分配给每个元素的索引,如果重新评估RDD,甚至可能会改变 . 如果需要固定排序来保证相同的索引分配,则应使用sortByKey()对RDD进行排序或将其保存到文件中 .

scala> List("X", "Y", "Z").zipWithIndex
res0: List[(String, Int)] = List((X,0), (Y,1), (Z,2))

数据框的工作原理相同 - val rdd = df.rdd.zipWithIndex

第二种选择:

val df_id = df.withColumn("id",monotonicallyIncreasingId)

参考:Spark-Monotonically increasing id not working as expected in dataframe?

Could you please suggest the optimal way to generate sequence numbers in Scala Spark.