我正在使用用python编写的UDF来更改数字的基数 .
所以我读了一个镶木地板文件并写入镶木地板文件并应用UDF . 这是我运行的行:
input_df.withColumn("origin_base", convert_2_dest_base(input_df.origin_base)).write.mode('overwrite').parquet(destination_path)
这种转换使得利用大量内存产生火花,我得到了这样的警告:
17/06/18 08:05:39 WARN TaskSetManager:阶段4.0中丢失的任务40.0(TID 183,ip-10-100-5-196.ec2.internal,执行者19):ExecutorLostFailure(执行者19退出由一个引起正在运行的任务)原因:由于超出内存限制而被YARN杀死的容器 . 使用4.4 GB的4.4 GB物理内存 . 考虑提升spark.yarn.executor.memoryOverhead .
最后它失败了 .
UDF不是正确的方法吗?为什么消耗这么多内存?
2 回答
对于pyspark,数据在Python中处理并在JVM中缓存/混洗 . 如果您使用的是内置的Python API,那么scala的性能就没有太大区别 . 见python vs scala performance
当您使用udf时,由于您的本地定义函数未在本机JVM结构中注册,因此无法通过简单的Java API调用实现,因此必须对Python worker进行序列化/反序列化 . 然后,数据将在Python worker中处理,并序列化/反序列化回JVM .
Python工作者现在需要处理堆外存储器中的序列化数据,它会占用大量的堆外内存,因此通常会导致memoryOverhead .
性能方面,serialization很慢,而且通常是性能调优的关键 .
udf
函数使用serialization
和deserialization
方法columns conversion
. 这就是使用大量内存的原因 . 您可以查看spark functions的替代品 .