在Apache Spark SQL中运行多行

loading...


2

我正在尝试在Apache Spark SQL中创建一个函数,该函数对多行数据进行操作,但是无法在Spark中直接找到这样做的方法 - 在Java中 .

我目前的解决方案是从Spark DataFrames中提取数据并将其提取到Java Lists中进行处理,然后再返回到Spark DataFrame . 这在性能方面并不理想 .

最好的选择似乎是Window functions,但不幸的是这些需要Hive上下文,我无权访问 . explode() function似乎是另一种选择,但同样,这是Scala特定的,我无法在Java中使用它 .

也许这可以通过将DataFrame转换回RDD来完成?

如果有人对如何使用Java中的Apache Spark SQL做任何提示或建议,那将非常感激 . 谢谢 .

Update :提供的示例:

+----------+-----------+------------+
|   Item   | Timestamp | Difference |
+----------+-----------+------------+
|     A    |   11:00   |    02:00   |
|     A    |   13:00   |      -     |
+----------+-----------+------------+
|     B    |   09:00   |      -     |
+----------+-----------+------------+
|     C    |   15:15   |    00:20   |
|     C    |   15:35   |    01:30   |
|     C    |   17:05   |      -     |
+----------+-----------+------------+

所以在这个例子中,我试图对按行分组的行对进行操作,以计算每个项目行之间的时差 .

使用SQL中的LAG()和LEAD()函数可以执行这样的任务,但这些任务需要Spark中的Hive .

1回答

  • 0

    从Spark 1.5开始,您现在可以定义UDAF或用户定义聚合函数,以便对输入数据组执行自定义聚合 . 我想这可能是我看到的最接近你正在寻找的东西 .

    通常,您需要创建一个扩展 UserDefinedAggregateFunction 的类,并实现涉及初始化,合并和聚合的必需方法 .

    一旦你创建了它,你可以只是实例化它,注册它,然后在你的SQL中使用它 .

    val myAggregation = new MyAggregation 
    sqlContext.udf.register("MY_AGG", myAggregation)
    

    https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html

loading...

评论

暂时没有评论!