我想在两个不同的条件下聚合(计数)并尝试分配给两个不同的列 . 任何人都可以建议我做任何简单的方法吗?
在我尝试过的解决方案中,尽管过滤条件不同,计数也会相同,
num_ins_rec_cnt = F.count(col("ins_upd_flag") == "I").alias("ins_rec_cnt")
num_upd_rec_cnt = F.count(col("ins_upd_flag") == "U").alias("upd_rec_cnt")
delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(F.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withColumn("lkp_process_name",lit(process_name)).withColumn("history_tbl_cnt",lit(base_rec_count)).withColumn("delta_tbl_cnt",lit(delta_rec_count))
输出是,
+-----------------+-----------+-----------+--------------------+---------------+-------------+
|max_account_sk_id|ins_rec_cnt|upd_rec_cnt| lkp_process_name|history_tbl_cnt|delta_tbl_cnt|
+-----------------+-----------+-----------+--------------------+---------------+-------------+
| 25099| 5100| 5100|amc_account_delta_ld| 19999| 20099|
+-----------------+-----------+-----------+--------------------+---------------+-------------+
但应该是,
+-------+---------------+--+
| _c0 | ins_upd_flag |
+-------+---------------+--+
| 5100 | I |
| 5000 | U |
Sample Data:
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+
|delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date| delta_load_tm| delta_hash_key|delta_eff_flag|ins_upd_flag|
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+
| ID330020000| 20000| 02345| CA| 2016-11-10| 3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...| Y| I|
| ID330020001| 20001| 02345| CA| 2016-11-10| 3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...| Y| I|
| ID330020002| 20002| 02345| CA| 2016-11-10| 3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...| Y|
1 回答
我已经通过以下方式解决了问题 .
delta_process_max_ld_df = cdc_all_record_sk_ld_df.withColumn('ins_upd_flag_cnt',**** F.when(cdc_all_record_sk_ld_df.ins_upd_flag == 'I',1).when(cdc_all_record_sk_ld_df.ins_upd_flag == 'U',0).otherwise(0)) .agg(F.max('delta_account_sk_id').alias("max_surrogate_id"),F.sum('ins_upd_flag_cnt').alias("insert_record_cnt"),F.count('*').alias("ins_upd_count")) .withColumn("process_name",lit (process_name)) . withColumn("process_run_date",lit(load_dt)) . withColumn("base_tbl_cnt",lit(base_rec_count)) . withColumn("delta_tbl_cnt",lit(delta_rec_count)) . withColumn("load_tm",lit(load_tm)) . withColumn("tbl_name",lit (tgt_tbl_nm)) . withColumn("load_date",lit(process_run_date))# . select(* status_tbl_columns)delta_process_max_ld_df1 = delta_process_max_ld_df.withColumn("upd_record_cnt",lit(delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)) .select(* status_tbl_columns)