首页 文章

Python Async Function中的多个Await

提问于
浏览
1

我在自定义类中使用aiohttp会话和信号量:

async def get_url(self, url):

    async with self.semaphore:
        async with self.session.get(url) as response:
            try:
                text_response = await response.text()
                read_response = await response.read()
                json_response = await response.json()
                await asyncio.sleep(random.uniform(0.1, 0.5))
            except aiohttp.client_exceptions.ContentTypeError:
                json_response = {}

            return {
                'json': json_response,
                'text': text_response,
                'read': read_response,
                'status': response.status,
                'url': response.url,
            }

我有两个问题:

  • 在单个异步函数中有多个await语句是正确/不正确的吗?我需要返回response.text()和response.read() . 但是,根据URL,response.json()可能可用,也可能不可用,因此我将所有内容抛入try / except块以捕获此异常 .

  • 由于我使用此函数循环遍历不同RESTful API endpoints 的列表,我控制通过信号量的同时请求数(设置为最大值100)但我还需要错开请求以使它们不是日志干扰主机 . 所以,我想我可以通过添加一个在0.1-0.5秒之间随机选择的asyncio.sleep来实现这一目标 . 这是在请求之间强制执行小等待的最佳方法吗?我应该将它移动到函数的开头而不是接近结尾吗?

1 回答

  • 2
    • 在一个异步函数中有多个等待是绝对正常的,只要你知道你正在等待什么,并且每个都在等待,就像非常正常的顺序执行一样 . 有关aiohttp的一点要提一下,你最好先调用 read() 并捕获 UnicodeDecodeError ,因为内部 text()json() 首先调用 read() 并处理其结果,你不希望处理阻止至少返回 read_response . 您不必担心多次调用 read() ,它只是在第一次调用时缓存在响应实例中 .

    • 随机交错是一种简单有效的突发交通解决方案 . 但是,如果您想要精确控制任意两个请求之间的最小时间间隔 - 出于学术原因,您可以设置两个信号量:

    def __init__(self):
        # something else
        self.starter = asyncio.Semaphore(0)
        self.ender = asyncio.Semaphore(30)
    

    然后更改 get_url() 以使用它们:

    async def get_url(self, url):
        await self.starter.acquire()
        try:
            async with self.session.get(url) as response:
                # your code
        finally:
            self.ender.release()
    

    因为 starter 初始化为零,所以所有 get_url() 协同程序都将在 starter 上阻塞 . 我们将使用单独的协程来控制它:

    async def controller(self):
        last = 0
        while self.running:
            await self.ender.acquire()
            sleep = 0.5 - (self.loop.time() - last)  # at most 2 requests per second
            if sleep > 0:
                await asyncio.sleep(sleep)
            last = self.loop.time()
            self.starter.release()
    

    你的主程序应该是这样的:

    def run(self):
        for url in [...]:
            self.loop.create_task(self.get_url(url))
        self.loop.create_task(self.controller())
    

    因此,首先,控制器将在15秒内均匀释放 starter 30次,因为这是 ender 的初始值 . 之后,如果任何 get_url() 结束,控制器将立即释放 starter ,如果自上次发布 starter 以来已经过了0.5秒,或者它将等到那个时间 .

    这里有一个问题:如果要获取的URL不是内存中的常量列表(例如,来自网络的URL经常出现不可预测的延迟),则RPS限制器将失败(在实际获取URL之前,启动器发布得太早) . 即使交通爆发的可能性已经非常低,您还需要对此案例进行进一步调整 .

相关问题