我想在大型数据集上使用 multiprocessing
来查找两个gps点之间的距离 . 我构建了一个测试集,但是我无法让 multiprocessing
在这个集合上工作 .
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
当发生此错误时,我在Windows7 Professional上使用Python 2.7.11和Ipython 4.1.2与Anaconda 2.5.0 64位 .
runfile('C:/.../ Desktop / multiprocessing test.py',wdir ='C:/.../ Desktop')Traceback(最近一次调用最后一次):文件“”,第1行,在runfile中( 'C:/.../ Desktop / multiprocessing test.py',wdir ='C:/.../ Desktop')文件“C:... \ Local \ Continuum \ Anaconda2 \ lib \ site-packages \ spyderlib \ widgets \ externalshell \ sitecustomize.py“,第699行,在runfile execfile(文件名,命名空间)文件”C:... \ Local \ Continuum \ Anaconda2 \ lib \ site-packages \ spyderlib \ widgets \ externalshell \ sitecustomize.py “,第74行,在execfile exec中(compile(scripttext,filename,'exec'),glob,loc)文件”C:/..../ multiprocessing test.py“,第33行,在pool.map中(calc_dist, ['lat','lon'])文件“C:... \ AppData \ Local \ Continuum \ Anaconda2 \ lib \ multiprocessing \ pool.py”,第251行,在map中返回self.map_async(func,iterable,chunksize) ).get()文件“C:... \ Local \ Continuum \ Anaconda2 \ lib \ multiprocessing \ pool.py”,第567行,in get raise self._value TypeError:无法从1创建Point实例 .
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
2 回答
怎么了?
你的代码中的这一行:
产生2个进程 - 一个运行
calc_dist('lat')
,另一个运行calc_dist('lon')
. 比较doc中的第一个例子 . (基本上,pool.map(f, [1,2,3])
使用下面列表中给出的参数调用f
三次:f(1)
,f(2)
和f(3)
. )如果我没有弄错,你的函数calc_dist
只能被称为calc_dist('lat', 'lon')
. 它不允许并行处理 .解决方案
我相信你想要在进程之间拆分工作,可能会将每个元组
(grp, lst)
发送到一个单独的进程 . 以下代码就是这样做的 .首先,让我们为拆分做准备:
我们'll send each of these tuples (here, there are three of them) as an argument to a function in a separate process. We need to rewrite the function, let' s称之为
calc_dist2
. 为方便起见,它的参数是calc_dist2(('aa',[0,1,2]))
中的元组现在来了多处理:
results
是grp_lst_args
中(grp,lst)
的调用calc_dist2((grp,lst))
的结果列表(此处为数据框) .results
的元素稍后连接到一个数据帧 .顺便说一句,在Python 3中我们可以使用
with
构造:Update
我只在linux上测试过这段代码 . 在linux上,只读数据框
df
可以被子进程访问,而不是复制到它们的内存空间,但我不确定它在Windows上是如何工作的 . 您可以考虑将df
拆分为块(按co_nm
分组)并将这些块作为参数发送到calc_dist
的某个其他版本 .奇怪 . 它似乎在python2下工作,但不是python3 .
这是打印输出的最小修改版本:
这是python2的输出
这是来自python3的堆栈跟踪
我知道这不是答案,但也许有帮助......