我刚刚开始学习 Scala 和 Spark,这是我尝试加入销售商品和汽车的方法。 Sale items
代表有关汽车销售的信息(carId,saleDate,cityofSale,价格)。 Cars
只是一个包含汽车信息(carId,carName)的元组。 ReportItem
是最终报告(carId,carName,saleDate,cityofSale,价格)。
该方法返回预期结果:ReportItems
,它们具有给定 carId/saleDate 的最高价格。作为 Scala/Spark 的初学者,我可能会错过一些东西,所以我想征求意见。有没有一种方法可以从 Scala 机会的角度更优化地实施该方法。我有一个使用 reduceByKey()方法的想法。但是找不到在该方法中正确实现它的方法。如有任何建议和批评,我将不胜感激。
def getSales(sales: RDD[SaleItem], cars: RDD[(String, String)]): RDD[ReportItem] = {
val mappedSales = sales.keyBy(_.carId)
val mappedCars = cars.keyBy(_._1)
mappedSales.join(mappedCars)
.map({
case (_, (saleItem, car)) => ReportItem(saleItem.carId, car._2, saleItem.saleDate, saleItem.city, saleItem.price)
})
.map(reportItem => ((reportItem.carId, reportItem.saleDate), reportItem))
.groupByKey()
.map({ case ((id, date), reportItem) => reportItem.maxBy(_.price) })
}
1 回答
是的,可以使用
.reduceByKey()
对其进行优化这是更有效的方法,因为对于给定的
(carId, saleDate)
对,我们只持有一个ReportItem
,因为那时到处都有最大价格的ReportItem
被解决。代码中的groupBy
在内存中保留给定对的所有ReportItem
的Iterable
,并在末尾计算 max,这是一个很大的内存消耗,尤其是在数据偏斜的情况下。