我有一个RDD[Log]
文件,其中包含多个字段(username,content,date,bytes
),并且我想为每个 field/column 查找不同的内容。
例如,我想获取 RDD 中的 min/max 和平均字节。当我做:
val q1 = cleanRdd.filter(x => x.bytes != 0)
我得到 RDD 的完整行,其字节数!=0。但是如何实际求和,计算平均值,找到 min/max 等?如何仅从 RDD 中获取一列并对其进行转换?
编辑:普拉萨德告诉我有关更改类型数据帧时,他就如何使虽然没有做说明,而我不能在网站上找到了坚实的答案。任何帮助都会很棒。
编辑:日志类:
case class Log (username: String, date: String, status: Int, content: Int)
使用 cleanRdd.take(5).foreach(println)会得到这样的结果
Log(199.72.81.55 ,01/Jul/1995:00:00:01 -0400,200,6245)
Log(unicomp6.unicomp.net ,01/Jul/1995:00:00:06 -0400,200,3985)
Log(199.120.110.21 ,01/Jul/1995:00:00:09 -0400,200,4085)
Log(burger.letters.com ,01/Jul/1995:00:00:11 -0400,304,0)
Log(199.120.110.21 ,01/Jul/1995:00:00:11 -0400,200,4179)
2 回答
好吧...您有很多问题。
所以...您具有以下日志抽象
队列-如何仅从 RDD 中获取一列。
回答-您具有 RDD 的
map
函数。因此,对于RDD[A]
,map
使用类型为A => B
的 map/transform 函数将其转换为RDD[B]
。--我怎么才能对它们求和?
Ans-您可以使用
reduce
/fold
/aggregate
完成此操作。注意::这里要注意的重要一点是
Int
的总和可能大于Int
的总和。因此,在大多数实际情况下,我们至少应使用Long
作为累加器,而不是Int
,实际上会删除reduce
和fold
作为选项。而且,我们将只剩下一个汇总。现在,如果您必须计算最小值,最大值,平均数之类的多个参数,那么我建议您使用一个
aggregate
而不是像这样的多个参数来计算它们,看看
DataFrame
API。您需要将 RDD 转换为 DataFrame,然后可以使用最小,最大,平均功能,如下所示:假设您要对第
bytes
列进行操作,则更新:
在 spark-shell 中尝试了以下内容。检查结果的屏幕截图...