首页 文章

PySpark分发模块导入

提问于
浏览
6

在过去的几天里,我一直在努力了解Spark执行者如何知道在导入时如何使用给定名称的模块 . 我正在使用AWS EMR . 情况:我通过键入来初始化EMR上的pyspark

pyspark - 主纱

然后,在pyspark,

import numpy as np ## notice the naming

def myfun(x):
    n = np.random.rand(1)
    return x*n

rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!

我的理解是,当我导入 numpy as np 时,主节点是唯一导入和标识 numpynp 的节点 . 但是,对于EMR集群(2个工作节点),如果我在rdd上运行map函数,则驱动程序将该函数发送到工作节点以执行列表中每个项目的功能(对于每个分区),以及返回成功结果 .

我的问题是: Worker 们如何知道numpy应该作为np导入?每个worker都已经安装了numpy,但是我没有为每个节点明确定义一种方法来导入模块 as np .

有关依赖关系的更多详细信息,请参阅Cloudera的以下帖子:http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

Complex Dependency 下,他们有一个示例(代码),其中pandas模块在每个节点上显式导入 .

我听说过的一个理论是驱动程序分发在pyspark交互式shell中传递的所有代码 . 我对此持怀疑态度 . 我提出的反驳这个想法的例子是,如果在主节点上我输入:

print "hello"

是每个 Worker 节点还打印“你好”?我不这么认为 . 但也许我错了 .

1 回答

  • 4

    当函数被序列化时,有number of objects is being saved

    • code

    • globals

    • 默认值

    • closure

    • 字典

    这可以在以后用于恢复给定功能所需的完整环境 .

    由于 np 被函数引用,因此可以从其代码中提取它:

    from pyspark.cloudpickle import CloudPickler
    
    CloudPickler.extract_code_globals(myfun.__code__)
    ## {'np'}
    

    和绑定可以从 globals 中提取:

    myfun.__globals__['np']
    ## <module 'numpy' from ...
    

    因此,序列化闭包(广义上)捕获恢复环境所需的所有信息 . 当然,闭包中访问的所有模块都必须可以在每台工作机器上导入 .

    其他一切只是读写机器 .

    另外,主节点不应该执行任何Python代码 . 它负责不运行应用程序代码的资源分配 .

相关问题