首页 文章

如何使asyncio池可取消?

提问于
浏览
1

我有一个 pool_map 函数,可用于限制同时执行的函数的数量 .

我们的想法是让coroutine function接受一个映射到可能参数列表的单个参数,但也将所有函数调用包装到信号量采集中,这样一次只运行一个有限的数字:

from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore

A = TypeVar('A')
V = TypeVar('V')

async def pool_map(
    func: Callable[[A], Awaitable[V]],
    arg_it: Iterable[A],
    size: int=10
) -> Generator[Awaitable[V], None, None]:
    """
    Maps an async function to iterables
    ensuring that only some are executed at once.
    """
    semaphore = Semaphore(size)

    async def sub(arg):
        async with semaphore:
            return await func(arg)

    return map(sub, arg_it)

我修改了并没有为了一个例子测试上面的代码,但我的变体运行良好 . 例如 . 你可以像这样使用它:

from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing

URLS = [...]

async def run_all(awaitables):
    for a in as_completed(awaitables):
        result = await a
        print('got result', result)

async def download(url): ...


if __name__ != '__main__':
    pool = pool_map(download, URLS)

    with closing(get_event_loop()) as loop:
        loop.run_until_complete(run_all(pool))

但是,如果在等待未来时抛出异常,则会出现问题 . 我无法看到如何取消所有已安排或仍在运行的任务,也无法等待仍在等待获取信号量的任务 .

我不知道是否有图书馆或优雅的构建块,或者我是否必须自己构建所有部件? (即 Semaphore 可以访问其服务员, as_finished 提供对其正在运行的任务队列的访问权限,......)

2 回答

  • 1

    使用 ensure_future 获取 Task 而不是协程:

    import asyncio
    from contextlib import closing
    
    
    def pool_map(func, args, size=10):
        """
        Maps an async function to iterables
        ensuring that only some are executed at once.
        """
        semaphore = asyncio.Semaphore(size)
    
        async def sub(arg):
            async with semaphore:
                return await func(arg)
    
        tasks = [asyncio.ensure_future(sub(x)) for x in args]
    
        return tasks
    
    
    async def f(n):
        print(">>> start", n)
    
        if n == 7:
            raise Exception("boom!")
    
        await asyncio.sleep(n / 10)
    
        print("<<< end", n)
        return n
    
    
    async def run_all(tasks):
        exc = None
        for a in asyncio.as_completed(tasks):
            try:
                result = await a
                print('=== result', result)
            except asyncio.CancelledError as e:
                print("!!! cancel", e)
            except Exception as e:
                print("Exception in task, cancelling!")
                for t in tasks:
                    t.cancel()
                exc = e
        if exc:
            raise exc
    
    
    pool = pool_map(f, range(1, 20), 3)
    
    with closing(asyncio.get_event_loop()) as loop:
        loop.run_until_complete(run_all(pool))
    
  • 1

    这是一个天真的解决方案,基于 cancel 是一个无操作的事实,如果任务已经完成:

    async def run_all(awaitables):
        futures = [asyncio.ensure_future(a) for a in awaitables]
        try:
            for fut in as_completed(futures):
                result = await fut
                print('got result', result)
        except:
            for future in futures:
                future.cancel()
            await asyncio.wait(futures)
    

相关问题