一旦所有HTTP请求都返回,我想启动大量HTTP请求并收集结果 . 使用 asyncio
可以以非阻塞方式发送请求,但我在收集结果时遇到问题 .
我知道针对这个特定问题的解决方案,如aiohttp . 但HTTP请求只是一个例子,我的问题是如何正确使用 asyncio
.
在服务器端,我有一个烧瓶,用"Hello World!"回复 localhost/
的每个请求,但它在回答之前等待0.1秒 . 在我的所有例子中,我发送了10个请求 . 同步代码大约需要1秒钟,异步版本可以在0.1秒内完成 .
在客户端,我想同时启动许多请求并收集他们的结果 . 我试图以三种不同的方式做到这一点 . 由于asyncio需要执行程序来解决阻塞代码,因此所有方法都调用 loop.run_in_executor
.
这段代码在它们之间共享:
import requests
from time import perf_counter
import asyncio
loop = asyncio.get_event_loop()
async def request_async():
r = requests.get("http://127.0.0.1:5000/")
return r.text
def request_sync():
r = requests.get("http://127.0.0.1:5000/")
return r.text
Approach 1:
在任务列表中使用 asyncio.gather()
,然后 run_until_complete
. 阅读Asyncio.gather vs asyncio.wait之后,似乎聚集会等待结果 . 但它没有't. So this code returns instantly, without waiting for the requests to finish. If I use a blocking function here, this works. Why can'我使用异步功能?
# approach 1
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003
# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.112
Python甚至警告我, coroutine "request_async"
从未等待过 . 此时,我有一个可行的解决方案:在执行程序中使用普通(非异步)函数 . 但我希望有一个适用于 async
函数定义的解决方案 . 因为我想在它们里面使用 await
(在这个简单的例子中没有必要,但是如果我将更多的代码移动到 asyncio
,我相信它会变得很重要) .
Approach 2:
Python警告我,我的协同程序从未等待过 . 所以让我们等待他们 . 方法2将所有代码包装到外部异步函数中,并等待收集的结果 . 同样的问题,也立即返回(同样的警告):
# approach 2
async def main():
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async))
gathered_tasks = asyncio.gather(*tasks)
return await gathered_tasks # <-------- here I'm waiting on the coroutine
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.0036
这让我很困惑 . 我在等待 gather
的结果 . 直觉上应该传播到我正在收集的协同程序 . 但python仍抱怨我的协程从未等待过 .
我读了一些,发现:How could I use requests in asyncio?
这几乎就是我的例子:结合 requests
和 asyncio
. 这让我接近3:
Approach 3:
与方法2相同的结构,但是等待分别给予 run_in_executor()
的每个任务(当然这等于协同程序):
# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():
tasks = []
for i in range(10):
task = loop.run_in_executor(None, request_async)
tasks.append(task)
responses = []
for task in tasks:
response = await task
responses.append(response)
return responses
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.004578
我的问题是:我想在我的协同程序中使用阻塞代码并与执行程序并行运行它们 . 我如何得到他们的结果?
1 回答
答案是你不应该在你的协程中有阻塞代码 . 如果你必须拥有它,你必须使用
run_in_executor
隔离它 . 所以写request_async
(使用requests
)的唯一正确方法是:将
request_async
赋予run_in_executor
是注定要失败的,因为run_in_executor
的整个要点是在另一个线程中调用同步函数 . 如果你给它一个协同程序函数,它会愉快地调用它(在另一个线程中)并将返回的协同程序对象提供为"result" . 这相当于将生成器传递给需要普通函数的代码 - 是的,它会调用生成器就好了,但它不知道如何处理返回的对象 .更重要的是,你不能只将
async
放在def
之前,并期望得到一个可用的协程 . 除了等待其他异步代码之外,协程不能阻止 .现在,一旦你有一个可用的
request_async
,你就可以收集它的结果: