首页 文章

Pyspark数据帧聚合计数在不同条件下

提问于
浏览
-2

我想在两个不同的条件下聚合(计数)并尝试分配给两个不同的列 . 任何人都可以建议我做任何简单的方法吗?

在我尝试过的解决方案中,尽管过滤条件不同,计数也会相同,

num_ins_rec_cnt = F.count(col("ins_upd_flag") == "I").alias("ins_rec_cnt")
num_upd_rec_cnt = F.count(col("ins_upd_flag") == "U").alias("upd_rec_cnt")
delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(F.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withColumn("lkp_process_name",lit(process_name)).withColumn("history_tbl_cnt",lit(base_rec_count)).withColumn("delta_tbl_cnt",lit(delta_rec_count))

输出是,

+-----------------+-----------+-----------+--------------------+---------------+-------------+
|max_account_sk_id|ins_rec_cnt|upd_rec_cnt|    lkp_process_name|history_tbl_cnt|delta_tbl_cnt|
+-----------------+-----------+-----------+--------------------+---------------+-------------+
|            25099|       5100|       5100|amc_account_delta_ld|          19999|        20099|
+-----------------+-----------+-----------+--------------------+---------------+-------------+

但应该是,

+-------+---------------+--+
|  _c0  | ins_upd_flag  |
+-------+---------------+--+
| 5100  | I             |
| 5000  | U             |

Sample Data:
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+
|delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date|      delta_load_tm|      delta_hash_key|delta_eff_flag|ins_upd_flag|
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+
|   ID330020000|              20000|         02345|                 CA|          2016-11-10|        3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|             Y|           I|
|   ID330020001|              20001|         02345|                 CA|          2016-11-10|        3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|             Y|           I|
|   ID330020002|              20002|         02345|                 CA|          2016-11-10|        3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|             Y|

1 回答

  • -1

    我已经通过以下方式解决了问题 .

    delta_process_max_ld_df = cdc_all_record_sk_ld_df.withColumn('ins_upd_flag_cnt',**** F.when(cdc_all_record_sk_ld_df.ins_upd_flag == 'I',1).when(cdc_all_record_sk_ld_df.ins_upd_flag == 'U',0).otherwise(0)) .agg(F.max('delta_account_sk_id').alias("max_surrogate_id"),F.sum('ins_upd_flag_cnt').alias("insert_record_cnt"),F.count('*').alias("ins_upd_count")) .withColumn("process_name",lit (process_name)) . withColumn("process_run_date",lit(load_dt)) . withColumn("base_tbl_cnt",lit(base_rec_count)) . withColumn("delta_tbl_cnt",lit(delta_rec_count)) . withColumn("load_tm",lit(load_tm)) . withColumn("tbl_name",lit (tgt_tbl_nm)) . withColumn("load_date",lit(process_run_date))# . select(* status_tbl_columns)delta_process_max_ld_df1 = delta_process_max_ld_df.withColumn("upd_record_cnt",lit(delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)) .select(* status_tbl_columns)

相关问题