首页 文章

将同步请求包装到asyncio(async / await)?

提问于
浏览
7

我在Python 3.6中编写了一个工具,它将请求发送到多个API(带有各种 endpoints ),并收集它们的解析来解析并将它们保存在数据库中 .

我使用的API客户端具有请求URL的 synchronous 版本,例如他们使用的

urllib.request.Request('...

或者他们使用Kenneth Reitz' Requests 库 .

由于我的API调用依赖于请求URL的同步版本,因此整个过程需要几分钟才能完成 .

现在我想将我的API调用包装在async / await(asyncio)中 . 我正在使用python 3.6 .

我发现的所有示例/教程都要我将同步URL调用/ requests 更改为它的异步版本(例如 aiohttp ) . 由于我的代码依赖于我没有改变的API客户端,所以我需要保持不变的代码 .

So is there a way to wrap my synchronous requests (blocking code) in async/await to make them run in an event loop?

我是Python新手中的asyncio . 这在NodeJS中是不费吹灰之力的 . 但我不能用Python来解决这个问题 .

1 回答

  • 6

    解决方案是将同步代码包装在线程中并以此方式运行 . 我用了那个确切的系统让我的 asyncio 代码运行 boto3 (注意:如果运行<python3.6,删除内联类型提示):

    async def get(self, key: str) -> bytes:
        s3 = boto3.client("s3")
        loop = asyncio.get_event_loop()
        try:
            response: typing.Mapping = \
                await loop.run_in_executor(  # type: ignore
                    None, functools.partial(
                        s3.get_object,
                        Bucket=self.bucket_name,
                        Key=key))
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchKey":
                raise base.KeyNotFoundException(self, key) from e
            elif e.response["Error"]["Code"] == "AccessDenied":
                raise base.AccessDeniedException(self, key) from e
            else:
                raise
        return response["Body"].read()
    

    请注意,这将起作用,因为 s3.get_object() 代码中的大量时间花在等待I / O上,并且(通常)在等待I / O python释放GIL时(GIL是python中通常线程的原因)不是一个好主意) .

    run_in_executor 中的第一个参数 None 表示我们在默认执行程序中运行 . 这是一个线程池 Actuator ,但它可以使事情更明确地在那里显式地分配线程池 Actuator .

    请注意,在使用纯异步I / O的情况下,您可以轻松地同时打开数千个连接,使用线程池执行程序意味着每个API的并发调用都需要一个单独的线程 . 一旦您的池中的线程用完,线程池将不会安排您的新调用,直到线程可用 . 显然你可以提高线程数,但这会占用内存;不要指望能够超过几千 .

    另请参阅the python ThreadPoolExecutor docs以获取有关如何在异步代码中包装同步调用的说明和一些略有不同的代码 .

相关问题