PySpark 流作业-避免对象序列化

我正在写 PySpark 作业,但遇到一些性能问题。基本上,它所做的只是从 Kafka 中读取事件并记录所做的转换。事实是,转换是基于对象的函数计算的,并且该对象相当沉重,因为它包含一个 Graph 和一个自动更新的 inner-cache。因此,当我编写以下代码时:

analyzer = ShortTextAnalyzer(root_dir)
logger.info("Start analyzing the documents from kafka")
ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])))

它将我的analyzer序列化,因为该图要花费很多时间,并且在复制到执行程序时,缓存仅与该特定的 RDD 相关。

如果这项工作是用 Scala 编写的,那么我可以编写一个对象,该对象将存在于每个执行器中,然后不必每次都序列化我的对象。

有没有办法在 Python 中做到这一点?为每个执行者创建一次我的对象,然后可以避免序列化过程?

预先感谢:)

**更新:**我已经阅读了在 PySpark 中处理数据之前如何在所有 Spark 工作者上运行功能?帖子,但是那里的答案是关于共享文件或广播变量的。我的对象不是 read-only,因此无法广播。它不断更新它的 inner-cache,这就是为什么我要在每个执行器上使用它的一个对象(以避免序列化的原因)。

回答(1)

4 years ago

最后,我避免将我的对象序列化,这是将我的类变成静态类-仅使用类变量和类方法。这样,每个执行程序都将一次导入该类(及其相关变量),并且不需要序列化。