首页 文章

在Spark中应用具有非常量帧大小的窗口函数

提问于
浏览
1

My Problem

我目前面临Spark窗口功能的困难 . 我正在使用Spark(通过pyspark)版本 1.6.3 (相关的Python版本 2.6.6 ) . 我运行一个pyspark shell实例,自动初始化 HiveContext 作为我的 sqlContext .

我想用 window 函数做一个滚动的总和 . 我的问题是窗框没有固定:它取决于我们考虑的观察 . 更具体地说,我通过名为 rank_id 的变量对数据进行排序,并希望对索引$ x 1 $和$ 2x-1 $之间的任何索引$ x $进行滚动求和 . 因此,我的 rangeBetween 必须依赖于 rank_id 变量值 .

重要的是,我 don't want to collect data 因此不能使用像 numpy 这样的东西(我的数据有很多观察结果) .

Reproducible example

from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window

# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")

#df.show(2)
#+-------------------+------------------+                                        
#|                  x|                 y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows

# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+                             
#|                   x|                   y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458|      1|
#| 0.10943843181953838|  0.6478505849227775|      2|
#| 0.13916818312857027| 0.24165348228464578|      3|
#+--------------------+--------------------+-------+
#only showing top 3 rows

Fixed width cumulative sum: no problem

使用 window 函数,我能够在给定数量的索引上运行累积和(我在这里使用 rangeBetween 但是对于这个例子, rowBetween 可以无差别地使用) .

w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+          
#|                   x|                   y|rank_id|             roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458|      1|0.9698521852602887|
#| 0.10943843181953838|  0.6478505849227775|      2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578|      3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows

Cumulative sum width not fixed

我想在索引x 1和2x-1之间求和,其中x是我的行索引 . 当我尝试将它传递给Spark时(类似于我们为_1078444做的那样也许就是这个问题),我得到了以下错误

# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)

Traceback(最近调用最后一次):TypeError中的文件“”,第1行:无法连接'str'和'int'对象

我尝试了其他的东西,使用SQL语句

# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
   FROM tempdf;
""")

这次给了我以下错误

回溯(最近一次调用最后一次):文件“”,第6行,在文件“/opt/application/Spark/current/python/pyspark/sql/context.py”中,行> 580,在sql返回DataFrame(self . _ssql_ctx.sql(sqlQuery),self)文件“/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”,第813行,在调用文件“/ opt / application中/Spark/current/python/pyspark/sql/utils.py“,第51行,在deco中引发AnalysisException(s.split(':',1)[1],stackTrace)pyspark.sql.utils.AnalysisException:u”无法识别windowframeboundary中'rank_id''''1'附近的输入;第3行pos 15“

我还注意到,当我使用 SQL OVER 子句尝试更简单的语句时,我得到了类似的错误,这可能意味着我没有正确地将SQL语句传递给Spark

df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN -1 AND 1) AS cumsum
   FROM tempdf;
 """)

回溯(最近一次调用最后一次):文件“”,第6行,在文件“/opt/application/Spark/current/python/pyspark/sql/context.py”,第580行,在sql返回DataFrame(self._ssql_ctx) .sql(sqlQuery),self)文件“/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”,第813行,在调用文件“/ opt / application / Spark / current / python / pyspark / sql / utils.py“,第51行,在deco中引发AnalysisException(s.split(':',1)[1],stackTrace)pyspark.sql.utils.AnalysisException:u”不能识别在windowframeboundary中' - ''''和'附近的输入;第3行pos 15“

如何在Spark中使用 windowSQL 语句来解决我的问题?

1 回答

  • 0

    如何通过在Spark中使用窗口或SQL语句来解决我的问题?

    TL;DR 您不能,或者至少不能以可扩展的方式使用当前要求 . 您可以尝试类似于滑动RDD:How to transform data with sliding window over time series data in Pyspark

    我还注意到,当我使用SQL OVER子句尝试更简单的语句时,我得到了类似的错误,这可能意味着我没有正确地将SQL语句传递给Spark

    这是不正确的 . 范围规范要求( PRECEDING | FOLLOWING | CURRENT_ROW )规范 . 也应该没有分号:

    SELECT *, SUM(x)
    OVER (ORDER BY rank_id
    RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
    FROM tempdf
    

    我想在索引x 1和2x-1之间求和,其中x是我的行索引 . 当我尝试将它传递给Spark时(类似于我们为orderBy做的可能就是这个问题),我得到了以下错误... TypeError:无法连接'str'和'int'对象

    例外情况说 - 你不能在字符串和整数上调用 + . 你可能想要列:

    from pyspark.sql.functions import col
    
    .rangeBetween(col('rank_id') + 1,  2* col('rank_id') - 1)
    

    但这不受支持 . 范围必须是固定大小,不能用表达式定义 .

    重要的一点是我不想收集数据

    没有 partitionBy 的窗口定义:

    w = Window.orderBy('rank_id').rangeBetween(-1,3)
    

    和收集一样糟糕 . 所以,即使有“动态框架”(有条件和无界窗口)问题的解决方法,它们也无法帮到你 .

相关问题