我在自定义类中使用aiohttp会话和信号量:
async def get_url(self, url):
async with self.semaphore:
async with self.session.get(url) as response:
try:
text_response = await response.text()
read_response = await response.read()
json_response = await response.json()
await asyncio.sleep(random.uniform(0.1, 0.5))
except aiohttp.client_exceptions.ContentTypeError:
json_response = {}
return {
'json': json_response,
'text': text_response,
'read': read_response,
'status': response.status,
'url': response.url,
}
我有两个问题:
-
在单个异步函数中有多个await语句是正确/不正确的吗?我需要返回response.text()和response.read() . 但是,根据URL,response.json()可能可用,也可能不可用,因此我将所有内容抛入try / except块以捕获此异常 .
-
由于我使用此函数循环遍历不同RESTful API endpoints 的列表,我控制通过信号量的同时请求数(设置为最大值100)但我还需要错开请求以使它们不是日志干扰主机 . 所以,我想我可以通过添加一个在0.1-0.5秒之间随机选择的asyncio.sleep来实现这一目标 . 这是在请求之间强制执行小等待的最佳方法吗?我应该将它移动到函数的开头而不是接近结尾吗?
1 回答
在一个异步函数中有多个等待是绝对正常的,只要你知道你正在等待什么,并且每个都在等待,就像非常正常的顺序执行一样 . 有关aiohttp的一点要提一下,你最好先调用
read()
并捕获UnicodeDecodeError
,因为内部text()
和json()
首先调用read()
并处理其结果,你不希望处理阻止至少返回read_response
. 您不必担心多次调用read()
,它只是在第一次调用时缓存在响应实例中 .随机交错是一种简单有效的突发交通解决方案 . 但是,如果您想要精确控制任意两个请求之间的最小时间间隔 - 出于学术原因,您可以设置两个信号量:
然后更改
get_url()
以使用它们:因为
starter
初始化为零,所以所有get_url()
协同程序都将在starter
上阻塞 . 我们将使用单独的协程来控制它:你的主程序应该是这样的:
因此,首先,控制器将在15秒内均匀释放
starter
30次,因为这是ender
的初始值 . 之后,如果任何get_url()
结束,控制器将立即释放starter
,如果自上次发布starter
以来已经过了0.5秒,或者它将等到那个时间 .这里有一个问题:如果要获取的URL不是内存中的常量列表(例如,来自网络的URL经常出现不可预测的延迟),则RPS限制器将失败(在实际获取URL之前,启动器发布得太早) . 即使交通爆发的可能性已经非常低,您还需要对此案例进行进一步调整 .