首页 文章

根据 Key 的值过滤 RDD

提问于
浏览
3

我有两个RDD,它们包装了以下数组:

Array((3,Ken), (5,Jonny), (4,Adam), (3,Ben), (6,Rhonda), (5,Johny))

Array((4,Rudy), (7,Micheal), (5,Peter), (5,Shawn), (5,Aaron), (7,Gilbert))

我需要以一种方式设计代码,如果我提供的输入为 3,则需要返回

Array((3,Ken), (3,Ben))

如果输入为 6,则输出应为

Array((6,Rhonda))

我尝试过这样的事情:

val list3 = list1.union(list2)

list3.reduceByKey(_+_).collect   

list3.reduceByKey(6).collect

这些都不起作用,有人可以帮助我解决这个问题吗?

1 回答

  • 2

    鉴于以下情况,您必须定义自己

    // Provide you SparkContext and inputs here
    val sc: SparkContext = ???
    val array1: Array[(Int, String)] = ???
    val array2: Array[(Int, String)] = ???
    val n: Int = ???
    
    val rdd1 = sc.parallelize(array1)
    val rdd2 = sc.parallelize(array2)
    

    您可以使用unionfilter达成目标

    rdd1.union(rdd2).filter(_._1 == n)
    

    由于可能需要多次通过键过滤,因此将此功能封装在其自己的功能中是很有意义的。

    如果我们可以确保此功能可以对任何类型的键(不仅是Int)起作用,也将很有趣。

    您可以按以下方式在旧的RDD API 中表达此信息:

    def filterByKey[K, V](rdd: RDD[(K, V)], k: K): RDD[(K, V)] =
      rdd.filter(_._1 == k)
    

    您可以按以下方式使用它:

    val rdd = rdd1.union(rdd2)
    
    val filtered = filterByKey(rdd, n)
    

    让我们更详细地看一下这种方法。

    此方法允许filterByKeyRDD包含通用对,其中第一项的类型为K,第二类型的类型为V(根据键和值)。它还接受类型为K的键,该键将用于过滤RDD

    然后,您使用filter函数,该函数接受一个谓词(此函数从某种类型(在这种情况下为K-变为Boolean)),并确保结果RDD仅包含尊重该谓词的项目。

    我们还可以将函数的主体编写为:

    rdd.filter(pair => pair._1 == k)
    

    要么

    rdd.filter { case (key, value) => key == k }
    

    但是我们利用_通配符来表达一个事实,即我们要对该匿名函数的第一个(也是唯一一个)参数进行操作。

    要使用它,您首先parallelize RDD,在它们上调用union,然后使用您要过滤的数字调用filterByKey函数(如示例中所示)。

相关问题