我有一个 pool_map
函数,可用于限制同时执行的函数的数量 .
我们的想法是让coroutine function接受一个映射到可能参数列表的单个参数,但也将所有函数调用包装到信号量采集中,这样一次只运行一个有限的数字:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
我修改了并没有为了一个例子测试上面的代码,但我的变体运行良好 . 例如 . 你可以像这样使用它:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
但是,如果在等待未来时抛出异常,则会出现问题 . 我无法看到如何取消所有已安排或仍在运行的任务,也无法等待仍在等待获取信号量的任务 .
我不知道是否有图书馆或优雅的构建块,或者我是否必须自己构建所有部件? (即 Semaphore
可以访问其服务员, as_finished
提供对其正在运行的任务队列的访问权限,......)
2 回答
使用
ensure_future
获取Task
而不是协程:这是一个天真的解决方案,基于
cancel
是一个无操作的事实,如果任务已经完成: