首页 文章

Python:在pandas数据帧上使用多处理

提问于
浏览
19

我想在大型数据集上使用 multiprocessing 来查找两个gps点之间的距离 . 我构建了一个测试集,但是我无法让 multiprocessing 在这个集合上工作 .

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def calc_dist(x):
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x], 
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

if __name__ == '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

当发生此错误时,我在Windows7 Professional上使用Python 2.7.11和Ipython 4.1.2与Anaconda 2.5.0 64位 .

runfile('C:/.../ Desktop / multiprocessing test.py',wdir ='C:/.../ Desktop')Traceback(最近一次调用最后一次):文件“”,第1行,在runfile中( 'C:/.../ Desktop / multiprocessing test.py',wdir ='C:/.../ Desktop')文件“C:... \ Local \ Continuum \ Anaconda2 \ lib \ site-packages \ spyderlib \ widgets \ externalshell \ sitecustomize.py“,第699行,在runfile execfile(文件名,命名空间)文件”C:... \ Local \ Continuum \ Anaconda2 \ lib \ site-packages \ spyderlib \ widgets \ externalshell \ sitecustomize.py “,第74行,在execfile exec中(compile(scripttext,filename,'exec'),glob,loc)文件”C:/..../ multiprocessing test.py“,第33行,在pool.map中(calc_dist, ['lat','lon'])文件“C:... \ AppData \ Local \ Continuum \ Anaconda2 \ lib \ multiprocessing \ pool.py”,第251行,在map中返回self.map_async(func,iterable,chunksize) ).get()文件“C:... \ Local \ Continuum \ Anaconda2 \ lib \ multiprocessing \ pool.py”,第567行,in get raise self._value TypeError:无法从1创建Point实例 .

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

2 回答

  • 1

    怎么了?

    你的代码中的这一行:

    pool.map(calc_dist, ['lat','lon'])
    

    产生2个进程 - 一个运行 calc_dist('lat') ,另一个运行 calc_dist('lon') . 比较doc中的第一个例子 . (基本上, pool.map(f, [1,2,3]) 使用下面列表中给出的参数调用 f 三次: f(1)f(2)f(3) . )如果我没有弄错,你的函数 calc_dist 只能被称为 calc_dist('lat', 'lon') . 它不允许并行处理 .

    解决方案

    我相信你想要在进程之间拆分工作,可能会将每个元组 (grp, lst) 发送到一个单独的进程 . 以下代码就是这样做的 .

    首先,让我们为拆分做准备:

    grp_lst_args = list(df.groupby('co_nm').groups.items())
    
    print(grp_lst_args)
    [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
    

    我们'll send each of these tuples (here, there are three of them) as an argument to a function in a separate process. We need to rewrite the function, let' s称之为 calc_dist2 . 为方便起见,它的参数是 calc_dist2(('aa',[0,1,2])) 中的元组

    def calc_dist2(arg):
        grp, lst = arg
        return pd.DataFrame(
                   [ [grp,
                      df.loc[c[0]].ser_no,
                      df.loc[c[1]].ser_no,
                      vincenty(df.loc[c[0], ['lat','lon']], 
                               df.loc[c[1], ['lat','lon']])
                     ]
                     for c in combinations(lst, 2)
                   ],
                   columns=['co_nm','machineA','machineB','distance'])
    

    现在来了多处理:

    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    results = pool.map(calc_dist2, grp_lst_args)
    pool.close()
    pool.join()
    
    results_df = pd.concat(results)
    

    resultsgrp_lst_args(grp,lst) 的调用 calc_dist2((grp,lst)) 的结果列表(此处为数据框) . results 的元素稍后连接到一个数据帧 .

    print(results_df)
      co_nm  machineA  machineB          distance
    0    aa         1         2  156.876149391 km
    1    aa         1         3  313.705445447 km
    2    aa         2         3  156.829329105 km
    0    cc         8         9  156.060165391 km
    1    cc         8         0  311.910998169 km
    2    cc         9         0  155.851498134 km
    0    bb         4         5  156.665641837 km
    1    bb         4         6  313.214333025 km
    2    bb         4         7  469.622535339 km
    3    bb         5         6  156.548897414 km
    4    bb         5         7  312.957597466 km
    5    bb         6         7   156.40899677 km
    

    顺便说一句,在Python 3中我们可以使用 with 构造:

    with mp.Pool() as pool:
        results = pool.map(calc_dist2, grp_lst_args)
    

    Update

    我只在linux上测试过这段代码 . 在linux上,只读数据框 df 可以被子进程访问,而不是复制到它们的内存空间,但我不确定它在Windows上是如何工作的 . 您可以考虑将 df 拆分为块(按 co_nm 分组)并将这些块作为参数发送到 calc_dist 的某个其他版本 .

  • 18

    奇怪 . 它似乎在python2下工作,但不是python3 .

    这是打印输出的最小修改版本:

    import pandas as pd
    from geopy.distance import vincenty
    from itertools import combinations
    import multiprocessing as mp
    
    df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                    'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                    'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                    'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
    
    
    
    def calc_dist(x):
        ret =  pd.DataFrame(
                   [ [grp,
                      df.loc[c[0]].ser_no,
                      df.loc[c[1]].ser_no,
                      vincenty(df.loc[c[0], x],
                               df.loc[c[1], x])
                     ]
                     for grp,lst in df.groupby('co_nm').groups.items()
                     for c in combinations(lst, 2)
                   ],
                   columns=['co_nm','machineA','machineB','distance'])
        print(ret)
        return ret
    
    if __name__ == '__main__':
        pool = mp.Pool(processes = (mp.cpu_count() - 1))
        pool.map(calc_dist, ['lat','lon'])
        pool.close()
        pool.join()
    

    这是python2的输出

    0     aa         1         2  110.723608682 km
    1     aa         1         3  221.460709525 km
    2     aa         2         3  110.737100843 km
    3     cc         8         9  110.827576495 km
    4     cc         8         0  221.671650552 km
       co_nm  machineA  machineB          distance
    5     cc         9         0  110.844074057 km
    0     aa         1         2  110.575064814 km
    1     aa         1         3  221.151481337 km
    6     bb         4         5  110.765515243 km
    2     aa         2         3  110.576416524 km
    7     bb         4         6    221.5459187 km
    3     cc         8         9  110.598565514 km
    4     cc         8         0  221.203121352 km
    8     bb         4         7  332.341640771 km
    5     cc         9         0  110.604555838 km
    6     bb         4         5   110.58113908 km
    9     bb         5         6  110.780403457 km
    7     bb         4         6  221.165643396 km
    10    bb         5         7  221.576125528 km
    8     bb         4         7  331.754177186 km
    9     bb         5         6  110.584504316 km
    10    bb         5         7  221.173038106 km
    11    bb         6         7  110.795722071 km
    11    bb         6         7   110.58853379 km
    

    这是来自python3的堆栈跟踪

    """
    Traceback (most recent call last):
      File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
        seq = iter(arg)
    TypeError: 'numpy.int64' object is not iterable
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
        result = (True, func(*args, **kwds))
      File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
        return list(map(*args))
      File "gps.py", line 29, in calc_dist
        for grp, lst in df.groupby('co_nm').groups.items()
      File "gps.py", line 30, in <listcomp>
        for c in combinations(lst, 2)
      File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
        super(vincenty, self).__init__(*args, **kwargs)
      File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
        kilometers += self.measure(a, b)
      File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
        a, b = Point(a), Point(b)
      File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
        "Failed to create Point instance from %r." % (arg,)
    TypeError: Failed to create Point instance from 8.
    """
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "gps.py", line 38, in <module>
        pool.map(calc_dist, ['lat', 'lon'])
      File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
        return self._map_async(func, iterable, mapstar, chunksize).get()
      File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
        raise self._value
    TypeError: Failed to create Point instance from 8.
    

    我知道这不是答案,但也许有帮助......

相关问题