首页 文章

如何使用python的asyncio模块正确创建和运行并发任务?

提问于
浏览
55

我正在尝试使用Python 3的相对较新的asyncio模块正确理解并实现两个并发运行的Task对象 .

简而言之,asyncio似乎旨在通过事件循环处理异步进程和并发 Task 执行 . 它促进了 await (应用于异步函数)作为无回调方式等待和使用结果,而不会阻塞事件循环 . (期货和回调仍然是一个可行的选择 . )

它还提供 asyncio.Task() 类, Future 的专用子类,用于包装协同程序 . 最好使用 asyncio.ensure_future() 方法调用 . asyncio任务的预期用途是允许独立运行的任务与同一事件循环中的其他任务一起运行'concurrently' . 我的理解是 Tasks 连接到事件循环,然后自动保持在 await 语句之间驱动协程 .

我喜欢能够使用并发任务而不需要使用其中一个Executor类的想法,但我没有找到很多关于实现的详细说明 .

这就是我目前正在做的事情:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

在尝试同时运行两个循环任务的情况下,我注意到除非任务具有内部 await 表达式,否则它将卡在 while 循环中,有效地阻止其他任务运行(很像正常的 while 循环) . 然而,一旦任务必须(a)等待,它们似乎同时运行而没有问题 .

因此, await 语句似乎为事件循环提供了在任务之间来回切换的立足点,从而产生了并发效果 .

内部 await 的示例输出:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

没有内部 await 的示例输出:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

问题

此实现是否通过 asyncio 中并发循环任务的'proper'示例?

这是否正确,这是唯一的方法是 Task 提供一个阻塞点( await 表达式),以便事件循环处理多个任务?

2 回答

  • 68

    您不一定需要 yield from x 来控制事件循环 .

    在你的例子中,我认为正确的方法是做一个 yield None 或等效的简单 yield ,而不是 yield from asyncio.sleep(0.001)

    import asyncio
    
    @asyncio.coroutine
    def say_boo():
      i = 0
      while True:
        yield None
        print("...boo {0}".format(i))
        i += 1
    
    @asyncio.coroutine
    def say_baa():
      i = 0
      while True:
        yield
        print("...baa {0}".format(i))
        i += 1
    
    boo_task = asyncio.async(say_boo())
    baa_task = asyncio.async(say_baa())
    
    loop = asyncio.get_event_loop()
    loop.run_forever()
    

    协同程序只是普通的Python生成器 . 在内部, asyncio 事件循环保留这些生成器的记录,并在永不停止的循环中逐个调用 gen.send() . 无论何时 yield ,对 gen.send() 的调用都会完成,循环可以继续 . (我正在简化它;看一下https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265的实际代码)

    也就是说,如果你需要在不共享数据的情况下进行CPU密集型计算,我仍然会使用 run_in_executor 路由 .

  • 12

    是的,在事件循环中运行的任何协同程序都会阻止其他协同程序和任务运行,除非它

    • 使用 yield fromawait (如果使用Python 3.5)调用另一个协同程序 .

    • 退货 .

    这是因为 asyncio 是单线程的;事件循环运行的唯一方法是没有其他协同程序正在执行 . 使用 yield from / await 临时暂停协程,为事件循环提供工作机会 .

    您的示例代码很好,但在许多情况下,您可能不会在事件循环中执行异步I / O操作 . 在这些情况下,使用BaseEventLoop.run_in_executor在后台线程或进程中运行代码通常更有意义 . 如果你的任务是CPU限制的话, ProcessPoolExecutor 将是更好的选择,如果你需要做一些不友好的I / O,将使用 ThreadPoolExecutor .

    例如,您的两个循环完全受CPU限制,并且不共享任何状态,因此使用 ProcessPoolExecutor 在CPU之间并行运行每个循环可以获得最佳性能:

    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    
    print('running async test')
    
    def say_boo():
        i = 0
        while True:
            print('...boo {0}'.format(i))
            i += 1
    
    
    def say_baa():
        i = 0
        while True:
            print('...baa {0}'.format(i))
            i += 1
    
    if __name__ == "__main__":
        executor = ProcessPoolExecutor(2)
        loop = asyncio.get_event_loop()
        boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
        baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))
    
        loop.run_forever()
    

相关问题