我试图使用for循环向数据帧添加新行 . 所以输入是:
ColA ColNum ColB ColB_lag1 ColB_lag2
Xyz 25 123 234 345
Abc 40 456 567 678
我想要的输出是这样的:
ColA ColNum ColB ColB_lag1 ColB_lag2
Xyz 25 123 234 345
Xyz 26 789 123 234
Abc 40 456 567 678
Abc 41 890 456 567
所以,我的代码是这样的:
df = df.withColumn("ColNum", (df.ColNum + 1).cast(IntegerType())) \
.withColumn("ColB_lag2", df.ColB_lag1)
.withColumn("ColB_lag1", df.ColB)
.withColumn("ColB", someFunc())
当我必须只添加一行时,代码工作正常,但是当我必须在循环中添加多行时,代码会中断 . 所以我使用For循环来完成它 . 我在循环开始时过滤最新的行,然后运行上面的逻辑来计算列的值 . 然后将新行追加到数据集中,该数据集再次在循环顶部使用 . 输出最终看起来像这样:
ColA ColNum ColB ColB_lag1 ColB_lag2
Xyz 25 123 234 345
Xyz 25 789 123
Xyz 26 789 123
Abc 40 456 567 678
Abc 40 890 456
Abc 41 890 456
问题是:PySpark中的'For'循环是否因并行化而中断,或者我在for循环(或循环中的函数顺序)中链接了太多函数导致这种不稳定的行为?
如果我错过了任何关键点,很高兴分享更多细节 .
编辑1:For循环如下:
num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")
for i in range(num_months):
df = sc.sql("""
SELECT *
FROM df_final mrd
INNER JOIN
(SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
FROM df_final
GROUP BY ColA) grouped_mrd
ON mrd.ColA = grouped_mrd.ColA_tmp
AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
""")
df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) \
.withColumn("ColB_lag2", df.ColB_lag1) \
.withColumn("ColB_lag1", df.ColB) \
.withColumn("ColB", someFunc())
df_final = df_final.union(df_tmp)
df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')
解决方案:问题在于工会 . 由于我正在删除列并重新计算它们,因此spark将这些列添加到结尾,“Union”按列位置而不是名称进行并集 . 这就是在后续循环中创建问题的原因,因为新行的数据移动了几列 . 解决方案是逐字地选择所有列并在进行并集之前重新排序它们 . 上面的代码段简化了,我可以在不丢弃ColB_lag2的情况下完成 . 实际代码之间还有另一个步骤,我从另一个数据帧连接中刷新一些值,并且在从新数据帧引入之前需要删除这些列 .
1 回答
您的问题是您正在创建数据框版本的临时视图(来自csv数据源的原始数据),并期望它反映对
df_final
数据框变量所做的更改 .临时视图
df_final
不包含循环运行时对数据框df_final
所做的数据 . 数据框是不可变的 . 解决此问题的一种方法是替换循环中的临时视图: