很明显,为了更好地分配小型查找数据以使用广播变量 .
假设我们在纱线客户端模式下从主节点运行pySpark代码(spark submit) . 因此,应始终在主节点上创建应用程序驱动程序 . 我们从主节点上的本地路径读取文件 .
with open('/tmp/myfile.txt', 'r') as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
然后我们创建 broadcast var 并使用它:
lookupBC = sc.broadcast(lookup)
output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookupBC.value.get(e, e), 1))\
.collect()
在我们的例子中,这个bc var是在驱动程序(主节点)上创建的,并且spark会在集群中的所有数据节点之间复制此var,其中创建执行程序将其保留在这些节点的内存中 . 因此,文件将被读取一次,然后分发给执行者 .
如果我们使用 addFile 选项会发生什么?
sc.addFile('/tmp/myfile.txt')
with open(SparkFiles.get('/tmp/myfile.txt')) as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookup.get(e, e), 1))\
.collect()
Spark会将文件 '/tmp/myfile.txt'
复制到每个节点,其中将创建执行程序 . 然后:
-
文件读取次数是多少?特定节点上每个执行者一次?或每次任务一次?
-
步骤是什么,如何在执行者上处理代码?
-
使用什么更好的addFile或bc var?
-
spark会根据pyspark代码进行任何优化并创建隐式bc变量吗?
在执行程序日志中,我看到有关bc vars的信息,但我在代码中不使用任何内容:
18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms
1 回答
广播变量似乎在内存中加载,直到它们被明确销毁 . 相比之下,
sc.addFile
似乎正在为磁盘创建一个副本(对于每个执行程序) . 所以我会猜到SparkFiles.get()
会在每次调用时将文件加载到内存中 .因此,在上面的示例中,它将被加载一次 .
但是如果您在
.map()
内调用SparkFiles.get()
,它会尝试为RDD中的每个条目重新加载文件 .最后,回答你的问题,
取决于
.get
的调用,如上所述 .我不明白这一部分 .
这些是不同的用例 . 例如,考虑我们有1GB sqliteDB转储的情况 . Spark可以通过JDBC连接到此DB对象 . 它并不真正需要在内存中加载整个对象 .
不确定,但我不这么认为 .