首页 文章

如何在pyspark数据帧中将groupby转换为reducebykey? [重复]

提问于
浏览
1

这个问题在这里已有答案:

我用group by和sum函数编写了pyspark代码 . 由于分组,我感觉性能受到影响 . 相反,我想使用reducebykey . 但我是这个领域的新手 . 请在下面找到我的方案,

步骤1:通过sqlcontext读取hive表连接查询数据并存储在dataframe中

步骤2:输入列的总数为15.其中5是关键字段,剩下的是数字值 .

步骤3:与上面的输入列一起,需要从数字列中导出更多列 . 很少列具有默认值 .

第4步:我使用了group by和sum函数 . 如何使用带有map和reducebykey选项的spark方式执行类似的逻辑 .

from pyspark.sql.functions import col, when, lit, concat, round, sum

#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])

#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
    withColumn("col6", col6).\
    withColumn("col7", col7)

#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
    withColumn("col6", col9).\
    withColumn("col7", col10)

#final dataframe
final_df = df1.union(df2)
final_df.show()

#groupBy calculation
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum

#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])

#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
    withColumn("col6", col6).\
    withColumn("col7", col7)

#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
    withColumn("col6", col9).\
    withColumn("col7", col10)

#final dataframe
final_df = df1.union(df2)
final_df.show()

#groupBy calculation
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show()

1 回答

  • 1

    Spark SQL中没有 reduceByKey .

    groupBy 聚合函数的工作方式与RDD.reduceByKey几乎相同 . Spark会自动选择它是否应该类似于 RDD.groupByKey (即对于collect_list)或 RDD.reduceByKey

    Dataset.groupBy聚合函数的性能应该优于或等于RDD.reduceByKey . Catalyst优化器负责如何在后台进行聚合

相关问题