首页 文章

pySpark addfile选项,在executor中的worker上发生了什么

提问于
浏览
1

很明显,为了更好地分配小型查找数据以使用广播变量 .

假设我们在纱线客户端模式下从主节点运行pySpark代码(spark submit) . 因此,应始终在主节点上创建应用程序驱动程序 . 我们从主节点上的本地路径读取文件 .

with open('/tmp/myfile.txt', 'r') as f:
    lookup = {}
    for line in f.readlines():
        line = parse(line) # Method parse uses re and return dict
        lookup[line['name']] = line['age']

然后我们创建 broadcast var 并使用它:

lookupBC = sc.broadcast(lookup)

output = sc.textFile('/path/to/hdfs/')\
    .map(lambda e: (lookupBC.value.get(e, e), 1))\
    .collect()

在我们的例子中,这个bc var是在驱动程序(主节点)上创建的,并且spark会在集群中的所有数据节点之间复制此var,其中创建执行程序将其保留在这些节点的内存中 . 因此,文件将被读取一次,然后分发给执行者 .

如果我们使用 addFile 选项会发生什么?

sc.addFile('/tmp/myfile.txt')

with open(SparkFiles.get('/tmp/myfile.txt')) as f:
    lookup = {}
    for line in f.readlines():
        line = parse(line) # Method parse uses re and return dict
        lookup[line['name']] = line['age']

output = sc.textFile('/path/to/hdfs/')\
    .map(lambda e: (lookup.get(e, e), 1))\
    .collect()

Spark会将文件 '/tmp/myfile.txt' 复制到每个节点,其中将创建执行程序 . 然后:

  • 文件读取次数是多少?特定节点上每个执行者一次?或每次任务一次?

  • 步骤是什么,如何在执行者上处理代码?

  • 使用什么更好的addFile或bc var?

  • spark会根据pyspark代码进行任何优化并创建隐式bc变量吗?

在执行程序日志中,我看到有关bc vars的信息,但我在代码中不使用任何内容:

18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms

1 回答

  • 1

    广播变量似乎在内存中加载,直到它们被明确销毁 . 相比之下, sc.addFile 似乎正在为磁盘创建一个副本(对于每个执行程序) . 所以我会猜到 SparkFiles.get() 会在每次调用时将文件加载到内存中 .

    • 因此,在上面的示例中,它将被加载一次 .

    • 但是如果您在 .map() 内调用 SparkFiles.get() ,它会尝试为RDD中的每个条目重新加载文件 .

    最后,回答你的问题,

    文件读取次数是多少?特定节点上每个执行者一次?或每次任务一次?

    取决于 .get 的调用,如上所述 .

    步骤是什么,如何在执行者上处理代码?

    我不明白这一部分 .

    使用什么更好的addFile或bc var?

    这些是不同的用例 . 例如,考虑我们有1GB sqliteDB转储的情况 . Spark可以通过JDBC连接到此DB对象 . 它并不真正需要在内存中加载整个对象 .

    spark会根据pyspark代码进行任何优化并创建隐式的bc变量吗?

    不确定,但我不这么认为 .

相关问题