如下所示,
步骤1:使用groupBy对呼叫进行分组
//Now group the calls by the s_msisdn for call type 1
//grouped: org.apache.spark.rdd.RDD[(String, Iterable[(String, (Array[String], String))])]
val groupedCallsToProcess = callsToProcess.groupBy(_._1)
第2步:映射分组的呼叫 .
//create a Map of the second element in the RDD, which is the callObject
//grouped: org.apache.spark.rdd.RDD[(String, Iterable[(String,(Array[String], String))])]
val mapOfCalls = groupedCallsToProcess.map(f => f._2.toList)
第3步:映射到Row对象,其中 Map 将具有[CallsObject,msisdn]的键值对
val listOfMappedCalls = mapOfCalls.map(f => f.map(_._2).map(c =>
Row(
c._1(CallCols.call_date_hour),
c._1(CallCols.sw_id),
c._1(CallCols.s_imsi),
f.map(_._1).take(1).mkString
)
))
当数据大小非常大时,如上所示的第3步似乎需要很长时间 . 我想知道是否有任何方法可以使第3步有效 . 非常感谢任何帮助 .
1 回答
在您的代码中有很多东西是非常昂贵的,而您实际上并不需要这些东西 .
您在第一步中不需要
groupBy
.groupBy
在Spark中非常昂贵 .整个第二步没用 . 由于GC开销很大,
toList
非常昂贵 .在第三步中删除1个额外的 Map . 每个
map
都是map函数顺序的线性运算 .永远不要做像
f.map(_._1).take(1)
这样的事情 . 您正在转换整个列表,但只使用1(或5)元素 . 而是做f.take(5).map(_._1)
. 如果你只需要1 -f.head._1
.在讨论如何在不使用
groupBy
的情况下以不同的方式编写此代码之前,我们先修复此代码 .但是......就像我说
groupBy
在Spark中非常昂贵 . 你的callsToProcess
已经有了RDD[(key, value)]
. 所以我们可以直接使用aggregateByKey
. 此外,您可能会注意到groupBy
除了将所有这些行放在列表中而不是直接放在内部和RDD之外的任何其他内容时都没用 .