首页 文章

使用python joblib调用并行类函数

提问于
浏览
0

可以使用joblib在python中对函数进行多次调用 .

from joblib import Parallel, delayed 

def normal(x):
    print "Normal", x
    return x**2

if  __name__ == '__main__':

    results = Parallel(n_jobs=2)(delayed(normal)(x) for x in range(20))
    print results

给: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

但是,我真正想要的是并行地在类实例列表上调用类函数 . 该函数只存储一个类变量 . 然后我会访问这个变量 .

from joblib import Parallel, delayed 

class A(object):
    def __init__(self, x):
        self.x = x
    def p(self):
        self.y = self.x**2

if  __name__ == '__main__':

    runs = [A(x) for x in range(20)]
    Parallel(n_jobs=4)(delayed(run.p() for run in runs))
    for run in runs:
        print run.y

这给出了一个错误:

Traceback(最近一次调用最后一次):文件“”,第1行,在runfile中('G:/ My Drive / CODE / stackoverflow / parallel_classfunc / parallel_classfunc.py',wdir ='G:/ My Drive / CODE / stackoverflow / parallel_classfunc')文件“C:\ ProgramData \ Anaconda2 \ lib \ site-packages \ spyder \ utils \ site \ sitecustomize.py”,第710行,在runfile execfile(文件名,命名空间)文件“C:\ ProgramData \ anaconda2 \ lib \ site-packages \ spyder \ utils \ site \ sitecustomize.py“,第86行,在execfile exec中(compile(scripttext,filename,'exec'),glob,loc)文件”G:/ My Drive / CODE / stackoverflow / parallel_classfunc / parallel_classfunc.py“,第12行,并行(n_jobs = 4)(延迟(run.p()运行中运行))文件”C:\ ProgramData \ Anaconda2 \ lib \ site-packages \ joblib \ parallel . py“,第183行,在延迟的pickle.dumps(函数)文件”C:\ ProgramData \ Anaconda2 \ lib \ copy_reg.py“,第70行,在_reduce_ex引发TypeError,”不能pickle%s对象“%base . name TypeError:无法pickle生成器对象

如何将joblib用于这样的类?还是有更好的方法?

1 回答

  • 1

    如何将joblib用于这样的类?

    我们先提出一些代码抛光:

    并非所有事情都符合 joblib.Parallel()( delayed() ) 呼叫签名功能:

    # >>> type( runs )                        <type 'list'>
    # >>> type( runs[0] )                     <class '__main__.A'>
    # >>> type( run.p() for run in runs )     <type 'generator'>
    

    所以,让我们让DEMO对象传递"through" aContainerFUN()

    StackOverflow_DEMO_joblib.Parallel.py :

    from sklearn.externals.joblib import Parallel, delayed
    import time
    
    class A( object ):
    
        def __init__( self, x ):
            self.x = x
            self.y = "Defined on .__init__()"
    
        def p(        self ):
            self.y = self.x**2
    
    def aNormalFUN( aValueOfX ):
        time.sleep( float( aValueOfX ) / 10. )
        print ": aNormalFUN() has got aValueOfX == {0:} to process.".format( aValueOfX )
        return aValueOfX * aValueOfX
    
    def aContainerFUN( aPayloadOBJECT ):
        time.sleep( float( aPayloadOBJECT.x ) / 10. )
        # try: except: finally:
        pass;  aPayloadOBJECT.p()
        print  "| aContainerFUN: has got aPayloadOBJECT.id({0:}) to process. [ Has made .y == {1:}, given .x == {2: } ]".format( id( aPayloadOBJECT ), aPayloadOBJECT.y, aPayloadOBJECT.x )
        time.sleep( 1 )
    
    if __name__ == '__main__':
         # ------------------------------------------------------------------
         results = Parallel( n_jobs = 2
                             )(       delayed( aNormalFUN )( aParameterX )
                             for                             aParameterX in range( 11, 21 )
                             )
         print results
         print '.'
         # ------------------------------------------------------------------
         pass;       runs = [ A( x ) for x in range( 11, 21 ) ]
         # >>> type( runs )                        <type 'list'>
         # >>> type( runs[0] )                     <class '__main__.A'>
         # >>> type( run.p() for run in runs )     <type 'generator'>
    
         Parallel( verbose = 10,
                   n_jobs  = 2
                   )(        delayed( aContainerFUN )( run )
                   for                                 run in runs
                   )
    

    结果?充当魅力!

    C:\ Python27.anaconda> python StackOverflow_DEMO_joblib.Parallel.py

    : aNormalFUN() has got aValueOfX == 11 to process.
    : aNormalFUN() has got aValueOfX == 12 to process.
    : aNormalFUN() has got aValueOfX == 13 to process.
    : aNormalFUN() has got aValueOfX == 14 to process.
    : aNormalFUN() has got aValueOfX == 15 to process.
    : aNormalFUN() has got aValueOfX == 16 to process.
    : aNormalFUN() has got aValueOfX == 17 to process.
    : aNormalFUN() has got aValueOfX == 18 to process.
    : aNormalFUN() has got aValueOfX == 19 to process.
    : aNormalFUN() has got aValueOfX == 20 to process.
    [121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
    .
    | aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 121, given .x ==  11 ]
    | aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 144, given .x ==  12 ]
    [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    2.4s
    | aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 169, given .x ==  13 ]
    | aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 196, given .x ==  14 ]
    [Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    4.9s
    | aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 225, given .x ==  15 ]
    | aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 256, given .x ==  16 ]
    | aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 289, given .x ==  17 ]
    | aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 324, given .x ==  18 ]
    | aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 361, given .x ==  19 ]
    | aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 400, given .x ==  20 ]
    [Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:   13.3s finished
    

相关问题