如何在PySpark中使用窗口函数?

我正在尝试使用一些Windows函数( ntilepercentRank )作为数据框,但我不知道如何使用它们 .

有人可以帮我这个吗?在Python API文档中没有关于它的示例 . (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=ntile#pyspark.sql.functions.ntile

具体来说,我试图在我的数据框中获得数字字段的分位数 .

我正在使用spark 1.4.0 .

回答(1)

2 years ago

为了能够使用窗口功能,您必须首先创建一个窗口 . 定义与普通SQL几乎相同,这意味着您可以定义顺序,分区或两者 . 首先让我们创建一些虚拟数据:

import numpy as np
np.random.seed(1)

keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])

df = sqlContext.createDataFrame([
   {"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])

确保您使用的是 HiveContext (仅限Spark <2.0):

from pyspark.sql import HiveContext

assert isinstance(sqlContext, HiveContext)

创建一个窗口:

from pyspark.sql.window import Window

w =  Window.partitionBy(df.k).orderBy(df.v)

这相当于

(PARTITION BY k ORDER BY v)

在SQL中 .

根据经验,窗口定义应始终包含 PARTITION BY 子句,否则Spark会将所有数据移动到单个分区 . 某些功能需要 ORDER BY ,而在不同情况下(通常为聚合)可能是可选的 .

还有两个可选项,可用于定义窗口 Span - ROWS BETWEENRANGE BETWEEN . 在这种特定情况下,这些对我们没用 .

最后,我们可以将它用于查询:

from pyspark.sql.functions import percentRank, ntile

df.select(
    "k", "v",
    percentRank().over(w).alias("percent_rank"),
    ntile(3).over(w).alias("ntile3")
)

请注意 ntile 与分位数无关 .