首页 文章

用reduceByKey()替换groupByKey()

提问于
浏览
2

这是here的后续问题 . 我正在尝试基于此implementation实现k-means . 它工作得很好,但我想用 reduceByKey() 替换 groupByKey() ,但我不确定(我现在不担心性能) . 这是相关的缩小代码:

val data = sc.textFile("dense.txt").map(
        t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()

val read_mean_centroids = sc.textFile("centroids.txt").map(
        t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
    ..

请注意 println(newCentroids) 将给出:

Map (23 - >( - 6.269305E-4,-0.0011746404,-4.08004E-5),8 - >( - 5:108732E-4,7.336348E-4,-3.707591E-4),17 - >( - 0.0016383086,-0.0016974678,1.45 ..

println(closest)

MapPartitionsRDD [6]在 Map kmeans.scala:75

相关问题:Using reduceByKey in Apache Spark (Scala) .


一些documentation

def reduceByKey(func:(V,V)⇒V):RDD [(K,V)]

使用关联reduce函数合并每个键的值 .

def reduceByKey(func:(V,V)⇒V,numPartitions:Int):RDD [(K,V)]

使用关联reduce函数合并每个键的值 .

def reduceByKey(分区器:分区器,功能:(V,V)⇒V):RDD [(K,V)]

使用关联reduce函数合并每个键的值 .

def groupByKey():RDD [(K,Iterable [V])]

将RDD中每个键的值分组为单个序列 .

1 回答

  • 5

    您可以使用 aggregateByKey() (比 reduceByKey() 更自然)来计算 newCentroids

    val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
      (agg, v) => (agg._1 += v, agg._2 + 1L),
      (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)
    ).mapValues(agg => agg._1/agg._2).collectAsMap
    

    为此,您需要计算数据的维度,即 dim ,但您只需要执行一次 . 你可以使用像 val dim = data.first._2.length 这样的东西 .

相关问题