首页 文章

RDD/Scala 从 RDD 中获取一列

提问于
浏览
0

我有一个RDD[Log]文件,其中包含多个字段(username,content,date,bytes),并且我想为每个 field/column 查找不同的内容。

例如,我想获取 RDD 中的 min/max 和平均字节。当我做:

val q1 = cleanRdd.filter(x => x.bytes != 0)

我得到 RDD 的完整行,其字节数!=0。但是如何实际求和,计算平均值,找到 min/max 等?如何仅从 RDD 中获取一列并对其进行转换?

编辑:普拉萨德告诉我有关更改类型数据帧时,他就如何使虽然没有做说明,而我不能在网站上找到了坚实的答案。任何帮助都会很棒。

编辑:日志类:

case class Log (username: String, date: String, status: Int, content: Int)

使用 cleanRdd.take(5).foreach(println)会得到这样的结果

Log(199.72.81.55 ,01/Jul/1995:00:00:01 -0400,200,6245)
Log(unicomp6.unicomp.net ,01/Jul/1995:00:00:06 -0400,200,3985)
Log(199.120.110.21 ,01/Jul/1995:00:00:09 -0400,200,4085)
Log(burger.letters.com ,01/Jul/1995:00:00:11 -0400,304,0)
Log(199.120.110.21 ,01/Jul/1995:00:00:11 -0400,200,4179)

2 回答

  • 2

    好吧...您有很多问题。

    所以...您具有以下日志抽象

    case class Log (username: String, date: String, status: Int, content: Int, byte: Int)
    

    队列-如何仅从 RDD 中获取一列。

    回答-您具有 RDD 的map函数。因此,对于RDD[A]map使用类型为A => B的 map/transform 函数将其转换为RDD[B]

    val logRdd: RDD[Log] = ...
    
    val byteRdd = logRdd
      .filter(l => l.bytes != 0)
      .map(l => l.byte)
    

    --我怎么才能对它们求和?

    Ans-您可以使用reduce/fold/aggregate完成此操作。

    val sum = byteRdd.reduce((acc, b) => acc + b)
    
    val sum = byteRdd.fold(0)((acc, b) => acc + b)
    
    val sum = byteRdd.aggregate(0)(
      (acc, b) => acc + b,
      (acc1, acc2) => acc1 + acc2
    )
    

    注意::这里要注意的重要一点是Int的总和可能大于Int的总和。因此,在大多数实际情况下,我们至少应使用Long作为累加器,而不是Int,实际上会删除reducefold作为选项。而且,我们将只剩下一个汇总。

    val sum = byteRdd.aggregate(0l)(
      (acc, b) => acc + b,
      (acc1, acc2) => acc1 + acc2
    )
    

    现在,如果您必须计算最小值,最大值,平均数之类的多个参数,那么我建议您使用一个aggregate而不是像这样的多个参数来计算它们,

    // (count, sum, min, max)
    val accInit = (0, 0, Int.MaxValue, Int.MinValue)
    
    val (count, sum, min, max) = byteRdd.aggregate(accInit)(
      { case ((count, sum, min, max), b) => 
          (count + 1, sum + b, Math.min(min, b), Math.max(max, b)) },
      { case ((count1, sum1, min1, max1), (count2, sum2, min2, max2)) => 
          (count1 + count2, sum1 + sum2, Math.min(min1, min2), Math.max(max1, max2)) }
    })
    
    val avg = sum.toDouble / count
    
  • 0

    看看DataFrame API。您需要将 RDD 转换为 DataFrame,然后可以使用最小,最大,平均功能,如下所示:

    val rdd = cleanRdd.filter(x => x.bytes != 0)
    val df = sparkSession.sqlContext.createDataFrame(rdd, classOf[Log])
    

    假设您要对第bytes列进行操作,则

    import org.apache.spark.sql.functions._
    
    df.select(avg("bytes")).show
    df.select(min("bytes")).show
    df.select(max("bytes")).show
    

    更新:

    在 spark-shell 中尝试了以下内容。检查结果的屏幕截图...

    case class Log (username: String, date: String, status: Int, content: Int)
    
    val inputRDD = sc.parallelize(Seq(Log("199.72.81.55","01/Jul/1995:00:00:01 -0400",200,6245), Log("unicomp6.unicomp.net","01/Jul/1995:00:00:06 -0400",200,3985), Log("199.120.110.21","01/Jul/1995:00:00:09 -0400",200,4085), Log("burger.letters.com","01/Jul/1995:00:00:11 -0400",304,0), Log("199.120.110.21","01/Jul/1995:00:00:11 -0400",200,4179)))
    
    val rdd = inputRDD.filter(x => x.content != 0)
    
    val df = rdd.toDF("username", "date", "status", "content")
    
    df.printSchema
    
    import org.apache.spark.sql.functions._
    
    df.select(avg("content")).show
    df.select(min("content")).show
    df.select(max("content")).show
    

    在此处输入图片说明

    在此处输入图片说明

相关问题