首页 文章

有没有一种方法可以优化 Scala 中加入的 RDD 的分组?

提问于
浏览
2

我刚刚开始学习 Scala 和 Spark,这是我尝试加入销售商品和汽车的方法。 Sale items代表有关汽车销售的信息(carId,saleDate,cityofSale,价格)。 Cars只是一个包含汽车信息(carId,carName)的元组。 ReportItem是最终报告(carId,carName,saleDate,cityofSale,价格)。

该方法返回预期结果:ReportItems,它们具有给定 carId/saleDate 的最高价格。作为 Scala/Spark 的初学者,我可能会错过一些东西,所以我想征求意见。有没有一种方法可以从 Scala 机会的角度更优化地实施该方法。我有一个使用 reduceByKey()方法的想法。但是找不到在该方法中正确实现它的方法。如有任何建议和批评,我将不胜感激。

def getSales(sales: RDD[SaleItem], cars: RDD[(String, String)]): RDD[ReportItem] = {
        val mappedSales = sales.keyBy(_.carId)
        val mappedCars = cars.keyBy(_._1)
        mappedSales.join(mappedCars)
          .map({
            case (_, (saleItem, car)) => ReportItem(saleItem.carId, car._2, saleItem.saleDate, saleItem.city, saleItem.price)
          })
          .map(reportItem => ((reportItem.carId, reportItem.saleDate), reportItem))
          .groupByKey()
          .map({ case ((id, date), reportItem) => reportItem.maxBy(_.price) })
}

1 回答

  • 2

    是的,可以使用.reduceByKey()对其进行优化

    def getSales(sales: RDD[SaleItem], cars: RDD[(String, String)]): RDD[ReportItem] = {
            val mappedSales = sales.keyBy(_.carId)
            val mappedCars = cars.keyBy(_._1)
            mappedSales.join(mappedCars)
              .map({
                case (_, (saleItem, car)) => ReportItem(saleItem.carId, car._2, saleItem.saleDate, saleItem.city, saleItem.price)
              })
              .map(reportItem => ((reportItem.carId, reportItem.saleDate), reportItem))
              .reduceByKey((item1, item2) => if(item1.price > item2.price) item1 else item2) //item1 & item2 are of type: ReportItem
              .values
    }
    

    这是更有效的方法,因为对于给定的(carId, saleDate)对,我们只持有一个ReportItem,因为那时到处都有最大价格的ReportItem被解决。代码中的groupBy在内存中保留给定对的所有ReportItemIterable,并在末尾计算 max,这是一个很大的内存消耗,尤其是在数据偏斜的情况下。

相关问题