首页 文章

列上的collect_list之后的PySpark reduceByKey聚合

提问于
浏览
0

我想根据collect_list收集的'states',按照以下示例进行聚合 .

示例代码:

states = sc.parallelize(["TX","TX","CA","TX","CA"])
states.map(lambda x:(x,1)).reduceByKey(operator.add).collect()
#printed output: [('TX', 3), ('CA', 2)]

我的代码:

from pyspark import SparkContext,SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import collect_list
import operator
conf = SparkConf().setMaster("local")
conf = conf.setAppName("test")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
rdd = sc.parallelize([('20170901',['TX','TX','CA','TX']), ('20170902', ['TX','CA','CA']), ('20170902',['TX']) ])
df = spark.createDataFrame(rdd, ["datatime", "actionlist"])
df = df.groupBy("datatime").agg(collect_list("actionlist").alias("actionlist"))

rdd = df.select("actionlist").rdd.map(lambda x:(x,1))#.reduceByKey(operator.add)
print (rdd.take(2))
#printed output: [(Row(actionlist=[['TX', 'CA', 'CA'], ['TX']]), 1 (Row(actionlist=[['TX', 'TX', 'CA', 'TX']]), 1)]
#for next step, it should look like:
#[(Row(actionlist=[('TX',1), ('CA',1), ('CA',1), ('TX',1)]), (Row(actionlist=[('TX',1), ('TX',1), ('CA',1), ('TX',1)])]

我想要的是:

20170901,[('TX', 3), ('CA', 1 )]
20170902,[('TX', 2), ('CA', 2 )]

我认为第一步是展平collect_list结果,我尝试过:udf(lambda x:list(chain.from_iterable(x)),StringType())udf(lambda items:list(chain.from_iterable(itertools.repeat(x) ,1)if isinstance(x,str)else x for items in items)))udf(lambda l:[子列表中项目子项列表中的项目])

但是还没有运气,下一步是化妆KV对并做减少,我在这里停留了一段时间,任何火花专家可以帮助逻辑吗?谢谢你的帮助!

2 回答

  • 2

    您可以在udf中使用reduce和counter来实现它 . 我试过自己的方式,希望这会有所帮助 .

    >>> from functools import reduce
    >>> from collections import Counter
    >>> from pyspark.sql.types import *
    >>> from pyspark.sql import functions as F
    >>> rdd = sc.parallelize([('20170901',['TX','TX','CA','TX']), ('20170902', ['TX','CA','CA']), ('20170902',['TX']) ])
    >>> df = spark.createDataFrame(rdd, ["datatime", "actionlist"])
    >>> df = df.groupBy("datatime").agg(F.collect_list("actionlist").alias("actionlist"))
    >>> def someudf(row):
            value = reduce(lambda x,y:x+y,row)
            return Counter(value).most_common()
    
    >>> schema = ArrayType(StructType([
        StructField("char", StringType(), False),
        StructField("count", IntegerType(), False)]))
    
    >>> udf1 = F.udf(someudf,schema)
    >>> df.select('datatime',udf1(df.actionlist)).show(2,False)
    +--------+-------------------+
    |datatime|someudf(actionlist)|
    +--------+-------------------+
    |20170902|[[TX,2], [CA,2]]   |
    |20170901|[[TX,3], [CA,1]]   |
    +--------+-------------------+
    
  • 2

    你可以通过使用combineByKey()来做到这一点:

    from collections import Counter
    count = rdd.combineByKey(lambda v: Counter(v),
                                     lambda c,v: c + Counter(v),
                                     lambda c1,c2: c1 + c2)
    print count #[('20170901', Counter({'TX': 3, 'CA': 1})), ('20170902', Counter({'CA': 2, 'TX': 2}))]
    

相关问题