我有一个大的查找表,它将整数作为键和字符串列表作为值 . 我需要这个查找表来对我通过spark加载的数据进行一些过滤和转换 .
import numpy as np
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")
sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)
在启动pyspark时,我甚至使用了 --driver-memory 20g
选项 .
我的机器有500 GB内存和27个内核 . 我首先在内存中加载一个名为 lookup_tbl
的字典,该字典有17457954行 .
当我尝试运行以下代码时,我得到的输出超过10分钟 . 等了这么久之后,我关闭了这个过程 . 我需要查找表功能 . 我甚至尝试过使用 broadcast
功能 .
sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "\t".join(k[1:]))):
x = x.split('\t')
return transform(x)
def check_self(x):
from_id = x[0]
to_id = x[1]
self_ = 1
try:
common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
except KeyError:
common_items = set()
if len(common_items ) < 1:
common_items = set("-")
self_ = 0
return (((from_id, to_id, k, self_) for k in common_items ))
pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "\t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")
这是火花的问题还是我没有正确运行?此外,我已尝试为执行程序和驱动程序内存设置各种值( ~20g
),但没有任何改进 .
根据我的理解,spark首先尝试将此字典序列化,然后再将其发送到所有本地进程 . 有没有办法可以从常用位置使用这个字典?
1 回答
首先要访问广播变量,你必须使用它的
value
属性:为避免广播,您可以在
mapPartitions
内本地加载lookup_tbl
:如果
lookup_tbl
相对较大,它仍然非常昂贵 . 有很多方法可以解决这个问题:它很容易设置,如果数据被正确编入索引,它应该足够快
使用单个数据库服务器进行查找 . MongoDB应该可以正常工作,并且通过适当的内存映射可以显着减少总体内存占用
使用
join
而不是广播