首页 文章

Spark Map 创建需要很长时间

提问于
浏览
2

如下所示,

步骤1:使用groupBy对呼叫进行分组

//Now group the calls by the s_msisdn for call type 1
//grouped: org.apache.spark.rdd.RDD[(String, Iterable[(String, (Array[String], String))])] 
val groupedCallsToProcess = callsToProcess.groupBy(_._1)

第2步:映射分组的呼叫 .

//create a Map of the second element in the RDD, which is the callObject
//grouped: org.apache.spark.rdd.RDD[(String, Iterable[(String,(Array[String], String))])] 

val mapOfCalls = groupedCallsToProcess.map(f => f._2.toList)

第3步:映射到Row对象,其中 Map 将具有[CallsObject,msisdn]的键值对

val listOfMappedCalls = mapOfCalls.map(f => f.map(_._2).map(c => 
  Row(
      c._1(CallCols.call_date_hour),
      c._1(CallCols.sw_id),   
      c._1(CallCols.s_imsi),
      f.map(_._1).take(1).mkString
    )
  ))

当数据大小非常大时,如上所示的第3步似乎需要很长时间 . 我想知道是否有任何方法可以使第3步有效 . 非常感谢任何帮助 .

1 回答

  • 2

    在您的代码中有很多东西是非常昂贵的,而您实际上并不需要这些东西 .

    • 您在第一步中不需要 groupBy . groupBy 在Spark中非常昂贵 .

    • 整个第二步没用 . 由于GC开销很大, toList 非常昂贵 .

    • 在第三步中删除1个额外的 Map . 每个 map 都是map函数顺序的线性运算 .

    • 永远不要做像 f.map(_._1).take(1) 这样的事情 . 您正在转换整个列表,但只使用1(或5)元素 . 而是做 f.take(5).map(_._1) . 如果你只需要1 - f.head._1 .

    在讨论如何在不使用 groupBy 的情况下以不同的方式编写此代码之前,我们先修复此代码 .

    // you had this in start
    val callsToProcess: RDD[(String, (Array[String], String))] = ....
    
    // RDD[(String, Iterable[(String, (Array[String], String))])]
    val groupedCallsToProcess = callsToProcess
      .groupBy(_._1)
    
    // skip the second step
    
    val listOfMappedCalls = groupedCallsToProcess
      .map({ case (key, iter) => {
        // this is what you did
        // val iterHeadString = iter.head._1
        // but the 1st element in each tuple of iter is actually same as key
        // so
        val iterHeadString = key
        // or we can totally remove this iterHeadString and use key
        iter.map({ case (str1, (arr, str2)) => Row(
          arr(CallCols.call_date_hour),
          arr(CallCols.sw_id),   
          arr(CallCols.s_imsi),
          iterHeadString
        ) })
      } })
    

    但是......就像我说 groupBy 在Spark中非常昂贵 . 你的 callsToProcess 已经有了 RDD[(key, value)] . 所以我们可以直接使用 aggregateByKey . 此外,您可能会注意到 groupBy 除了将所有这些行放在列表中而不是直接放在内部和RDD之外的任何其他内容时都没用 .

    // you had this in start
    val callsToProcess: RDD[(String, (Array[String], String))] = ....
    
    // new lets map it to look like what you needed because we can 
    // totally do this without any grouping
    // I somehow believe that you needed this RDD[Row] and not RDD[List[Row]]
    // RDD[Row]
    val mapped = callsToProcess
      .map({ case (key, (arr, str)) => Row(
          arr(CallCols.call_date_hour),
          arr(CallCols.sw_id),   
          arr(CallCols.s_imsi),
          key
      ) })
    
    
    // Though I can not think of any reason of wanting this
    // But if you really needed that RDD[List[Row]] thing...
    // then keep the keys with your rows
    // RDD[(String, Row)]
    val mappedWithKey = callsToProcess
      .map({ case (key, (arr, str)) => (key, Row(
          arr(CallCols.call_date_hour),
          arr(CallCols.sw_id),   
          arr(CallCols.s_imsi),
          key
      )) })
    
    // now aggregate by the key to create your lists
    // RDD[List[Row]]
    val yourStrangeRDD = mappedWithKey
      .aggregateByKey(List[Row]())(
        (list, row) => row +: list, // prepend, do not append
        (list1, list2) => list1 ++ list2
      )
    

相关问题