首页 文章

如何从df.collect()中检索特定值导致PySpark?

提问于
浏览
0

我在PySpark中有以下DataFrame df .

import pyspark.sql.functions as func

df = spark\
        .read \
        .format("org.elasticsearch.spark.sql") \
        .load("my_index/my_mapping") \
        .groupBy(["id", "type"]) \
        .agg(
            func.count(func.lit(1)).alias("number_occurrences"),
            func.countDistinct("host_id").alias("number_hosts")
        )

ds = df.collect()

我使用 collect 因为分组和聚合后的数据量总是很小并且适合内存 . 另外,我需要使用 collect ,因为我将 ds 作为 udf 函数的参数传递 . 函数 collect 返回一个数组 . 如何对此数组进行以下查询: for the given id and type, return number_occurrences and number_hosts .

例如,让我们假设 df 包含以下行:

id   type   number_occurrences   number_hosts
1    xxx    11                   3
2    yyy    10                   4

执行 df.collect() 后,如何检索 number_occurencesnumber_hostsid 等于 1type 等于 xxx . 预期的结果是:

number_occurrences = 11
number_hosts = 3

Update:

也许有更优雅的解决方案?

id = 1
    type = "xxx"
    number_occurrences = 0
    number_hosts = 0
    for row in ds:
        if (row["id"] == id) & (row["type"] == type):
            number_occurrences = row["number_occurrences"]
            number_hosts = row["number_hosts"]

1 回答

  • 0

    如果你的 id 是唯一的,这应该是id的情况,你可以根据id对数组进行排序 . 这只是确保正确的顺序,如果你的id是顺序的,你可以直接访问记录并将id减去1

    test_df = spark.createDataFrame([
    (1,"xxx",11,3),(2,"yyyy",10,4),
    
    ], ("id","type","number_occurrences","number_hosts"))
    id = 1
    type = "xxx"
    sorted_list = sorted(test_df.collect(), cmp=lambda x,y: cmp(x["id"],y["id"]))
    sorted_list[id-1]["number_occurrences"],sorted_list[id-1]["number_hosts"]
    

    结果:

    (11, 3)
    

相关问题