首页 文章

Python中的异步方法调用?

提问于
浏览
153

我想知道Python中是否有任何异步方法调用库 . 如果你可以做类似的事情会很棒

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

或者异步调用非异步例程

def longComputation()
    <code>

token = asynccall(longComputation())

如果在语言核心中使用更加精细的策略,那就太棒了 . 这是考虑过吗?

12 回答

  • 126

    有没有理由不使用线程?您可以使用 threading 类 . 而不是 finished() 函数使用 isAlive() . result() 函数可以 join() 线程并检索结果 . 并且,如果可以,则覆盖 run()__init__ 函数以调用构造函数中指定的函数,并将值保存到类的实例中 .

  • 2

    您可以使用Python 2.6中添加的multiprocessing module . 您可以使用进程池,然后异步获取结果:

    apply_async(func[, args[, kwds[, callback]]])
    

    例如 . :

    from multiprocessing import Pool
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        pool = Pool(processes=1)              # Start a worker processes.
        result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
    

    这只是一种选择 . 该模块提供了许多设施来实现您的需求 . 从这里制作装饰器也很容易 .

  • 28

    就像是:

    import threading
    
    thr = threading.Thread(target=foo, args=(), kwargs={})
    thr.start() # Will run "foo"
    ....
    thr.is_alive() # Will return whether foo is running currently
    ....
    thr.join() # Will wait till "foo" is done
    

    有关详细信息,请参阅https://docs.python.org/2/library/threading.html#module-threading的文档;此代码也适用于Python 3 .

  • 5

    从Python 3.5开始,您可以使用增强型生成器来实现异步功能 .

    import asyncio
    import datetime
    

    Enhanced generator syntax:

    @asyncio.coroutine
    def display_date(loop):
        end_time = loop.time() + 5.0
        while True:
            print(datetime.datetime.now())
            if (loop.time() + 1.0) >= end_time:
                break
            yield from asyncio.sleep(1)
    
    
    loop = asyncio.get_event_loop()
    # Blocking call which returns when the display_date() coroutine is done
    loop.run_until_complete(display_date(loop))
    loop.close()
    

    New async/await syntax:

    async def display_date(loop):
        end_time = loop.time() + 5.0
        while True:
            print(datetime.datetime.now())
            if (loop.time() + 1.0) >= end_time:
                break
            await asyncio.sleep(1)
    
    
    loop = asyncio.get_event_loop()
    # Blocking call which returns when the display_date() coroutine is done
    loop.run_until_complete(display_date(loop))
    loop.close()
    
  • 36

    它不是语言核心,而是一个非常成熟的库,可以做你想要的Twisted . 它引入了Deferred对象,您可以将回调或错误处理程序("errbacks")附加到 . 延迟基本上是"promise",函数最终会得到一个结果 .

  • 7

    你可以实现一个装饰器来使你的函数异步,虽然这有点棘手 . multiprocessing 模块充满了小怪癖和看似随意的限制 - 更有理由将它封装在友好的界面之后 .

    from inspect import getmodule
    from multiprocessing import Pool
    
    
    def async(decorated):
        r'''Wraps a top-level function around an asynchronous dispatcher.
    
            when the decorated function is called, a task is submitted to a
            process pool, and a future object is returned, providing access to an
            eventual return value.
    
            The future object has a blocking get() method to access the task
            result: it will return immediately if the job is already done, or block
            until it completes.
    
            This decorator won't work on methods, due to limitations in Python's
            pickling machinery (in principle methods could be made pickleable, but
            good luck on that).
        '''
        # Keeps the original function visible from the module global namespace,
        # under a name consistent to its __name__ attribute. This is necessary for
        # the multiprocessing pickling machinery to work properly.
        module = getmodule(decorated)
        decorated.__name__ += '_original'
        setattr(module, decorated.__name__, decorated)
    
        def send(*args, **opts):
            return async.pool.apply_async(decorated, args, opts)
    
        return send
    

    下面的代码说明了装饰器的用法:

    @async
    def printsum(uid, values):
        summed = 0
        for value in values:
            summed += value
    
        print("Worker %i: sum value is %i" % (uid, summed))
    
        return (uid, summed)
    
    
    if __name__ == '__main__':
        from random import sample
    
        # The process pool must be created inside __main__.
        async.pool = Pool(4)
    
        p = range(0, 1000)
        results = []
        for i in range(4):
            result = printsum(i, sample(p, 100))
            results.append(result)
    
        for result in results:
            print("Worker %i: sum value is %i" % result.get())
    

    在一个真实的案例中,我会在装饰器上详细说明一下,提供一些方法来关闭它以进行调试(同时保持未来的接口),或者可能是处理异常的工具;但我认为这足以证明这一原则 .

  • 13

    只是

    import threading, time
    
    def f():
        print "f started"
        time.sleep(3)
        print "f finished"
    
    threading.Thread(target=f).start()
    
  • 0

    我的解决方案是:

    import threading
    
    class TimeoutError(RuntimeError):
        pass
    
    class AsyncCall(object):
        def __init__(self, fnc, callback = None):
            self.Callable = fnc
            self.Callback = callback
    
        def __call__(self, *args, **kwargs):
            self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
            self.Thread.start()
            return self
    
        def wait(self, timeout = None):
            self.Thread.join(timeout)
            if self.Thread.isAlive():
                raise TimeoutError()
            else:
                return self.Result
    
        def run(self, *args, **kwargs):
            self.Result = self.Callable(*args, **kwargs)
            if self.Callback:
                self.Callback(self.Result)
    
    class AsyncMethod(object):
        def __init__(self, fnc, callback=None):
            self.Callable = fnc
            self.Callback = callback
    
        def __call__(self, *args, **kwargs):
            return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)
    
    def Async(fnc = None, callback = None):
        if fnc == None:
            def AddAsyncCallback(fnc):
                return AsyncMethod(fnc, callback)
            return AddAsyncCallback
        else:
            return AsyncMethod(fnc, callback)
    

    并按照要求完成工作:

    @Async
    def fnc():
        pass
    
  • 19

    你可以使用eventlet . 它允许您编写看似同步代码的内容,但让它在网络上异步操作 .

    以下是超级最小爬虫的示例:

    urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
         "https://wiki.secondlife.com/w/images/secondlife.jpg",
         "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
    
    import eventlet
    from eventlet.green import urllib2
    
    def fetch(url):
    
      return urllib2.urlopen(url).read()
    
    pool = eventlet.GreenPool()
    
    for body in pool.imap(fetch, urls):
      print "got body", len(body)
    
  • 0

    这样的东西对我有用,然后你可以调用该函数,它会将自己调度到一个新的线程上 .

    from thread import start_new_thread
    
    def dowork(asynchronous=True):
        if asynchronous:
            args = (False)
            start_new_thread(dowork,args) #Call itself on a new thread.
        else:
            while True:
                #do something...
                time.sleep(60) #sleep for a minute
        return
    
  • 184

    您可以使用concurrent.futures(在Python 3.2中添加) .

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def long_computation(duration):
        for x in range(0, duration):
            print(x)
            time.sleep(1)
        return duration * 2
    
    
    print('Use polling')
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(long_computation, 5)
        while not future.done():
            print('waiting...')
            time.sleep(0.5)
    
        print(future.result())
    
    print('Use callback')
    executor = ThreadPoolExecutor(max_workers=1)
    future = executor.submit(long_computation, 5)
    future.add_done_callback(lambda f: print(f.result()))
    
    print('waiting for callback')
    
    executor.shutdown(False)  # non-blocking
    
    print('shutdown invoked')
    
  • 7

    您可以使用流程 . 如果你想永远运行它,请使用while(如网络)功能:

    from multiprocessing import Process
    def foo():
        while 1:
            # Do something
    
    p = Process(target = foo)
    p.start()
    

    如果您只想运行一次,请执行以下操作:

    from multiprocessing import Process
    def foo():
        # Do something
    
    p = Process(target = foo)
    p.start()
    p.join()
    

相关问题