我需要为包含许多列的数据表生成row_numbers的完整列表 .
在SQL中,这将如下所示:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD .
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)
我该怎么做呢?
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
另请注意,函数sortBy不能直接应用于RDD,但必须首先运行collect(),然后输出也不是RDD,而是数组
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
这里有一些进展,但仍未分区:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
3 回答
row_number() over (partition by ... order by ...)
功能已添加到Spark 1.4中 . 这个答案使用PySpark / DataFrames .创建一个测试DataFrame:
添加分区的行号:
这是你提出的一个有趣的问题 . 我将用Python回答它,但我相信你能够无缝地翻译成Scala .
以下是我将如何解决它:
1-简化您的数据:
temp2现在是一个“真正的”键值对 . 它看起来像这样:
]
2-然后,使用分组依据功能重现PARTITION BY的效果:
temp3现在是一个包含2行的RDD:
3-现在,您需要为RDD的每个值应用等级函数 . 在python中,我会使用简单的排序函数(枚举将创建你的row_number列):
请注意,要实现您的特定订单,您需要提供正确的“key”参数(在python中,我只需创建一个lambda函数,如下所示:
最后(没有键参数函数,它看起来像那样):
]
希望有所帮助!
祝好运 .
test:Seq [(String,(Int,Int,Int))] = List((key1,(1,2,3)),(key1,(4,5,6)),(key2,(7,8) ,9)),(key2,(0,1,2)))
(KEY1,(1,2,3))
(KEY1,(4,5,6))
(KEY2,(7,8,9))
(KEY2,(0,1,2))
rdd:org.apache.spark.rdd.RDD [(String,(Int,Int,Int))] = ParallelCollectionRDD [41]并行化:26
rdd1:org.apache.spark.rdd.RDD [(String,Array [((Int,Int,Int),Int)])] = MapPartitionsRDD [44] at map at:25
rdd2:org.apache.spark.rdd.RDD [(String,(Int,Int,Int),Int)] = MapPartitionsRDD [45] at flatMap at:25
(KEY1,(1,2,3),0)
(KEY1,(4,5,6),1)
(KEY2,(0,1,2),0)
(KEY2,(7,8,9),1)