首页 文章

asyncio:从执行程序中的异步函数收集结果

提问于
浏览
1

一旦所有HTTP请求都返回,我想启动大量HTTP请求并收集结果 . 使用 asyncio 可以以非阻塞方式发送请求,但我在收集结果时遇到问题 .

我知道针对这个特定问题的解决方案,如aiohttp . 但HTTP请求只是一个例子,我的问题是如何正确使用 asyncio .

在服务器端,我有一个烧瓶,用"Hello World!"回复 localhost/ 的每个请求,但它在回答之前等待0.1秒 . 在我的所有例子中,我发送了10个请求 . 同步代码大约需要1秒钟,异步版本可以在0.1秒内完成 .

在客户端,我想同时启动许多请求并收集他们的结果 . 我试图以三种不同的方式做到这一点 . 由于asyncio需要执行程序来解决阻塞代码,因此所有方法都调用 loop.run_in_executor .

这段代码在它们之间共享:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

def request_sync():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

Approach 1:

在任务列表中使用 asyncio.gather() ,然后 run_until_complete . 阅读Asyncio.gather vs asyncio.wait之后,似乎聚集会等待结果 . 但它没有't. So this code returns instantly, without waiting for the requests to finish. If I use a blocking function here, this works. Why can'我使用异步功能?

# approach 1
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003

# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)

stop = perf_counter()
print(f"finished {stop - start}") # 0.112

Python甚至警告我, coroutine "request_async" 从未等待过 . 此时,我有一个可行的解决方案:在执行程序中使用普通(非异步)函数 . 但我希望有一个适用于 async 函数定义的解决方案 . 因为我想在它们里面使用 await (在这个简单的例子中没有必要,但是如果我将更多的代码移动到 asyncio ,我相信它会变得很重要) .

Approach 2:

Python警告我,我的协同程序从未等待过 . 所以让我们等待他们 . 方法2将所有代码包装到外部异步函数中,并等待收集的结果 . 同样的问题,也立即返回(同样的警告):

# approach 2
async def main():

    tasks = []
    for i in range(10):
        tasks.append(loop.run_in_executor(None, request_async))

    gathered_tasks = asyncio.gather(*tasks)

    return await gathered_tasks # <-------- here I'm waiting on the coroutine 

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}")  # 0.0036

这让我很困惑 . 我在等待 gather 的结果 . 直觉上应该传播到我正在收集的协同程序 . 但python仍抱怨我的协程从未等待过 .

我读了一些,发现:How could I use requests in asyncio?

这几乎就是我的例子:结合 requestsasyncio . 这让我接近3:

Approach 3:

与方法2相同的结构,但是等待分别给予 run_in_executor() 的每个任务(当然这等于协同程序):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

    tasks = []
    for i in range(10):
        task = loop.run_in_executor(None, request_async)
        tasks.append(task)

    responses = []
    for task in tasks:
        response = await task
        responses.append(response)

    return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

我的问题是:我想在我的协同程序中使用阻塞代码并与执行程序并行运行它们 . 我如何得到他们的结果?

1 回答

  • 3

    我的问题是:我想在我的协同程序中使用阻塞代码并与执行程序并行运行它们 . 我如何得到他们的结果?

    答案是你不应该在你的协程中有阻塞代码 . 如果你必须拥有它,你必须使用 run_in_executor 隔离它 . 所以写 request_async (使用 requests )的唯一正确方法是:

    async def request_async():
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, request_sync)
    

    request_async 赋予 run_in_executor 是注定要失败的,因为 run_in_executor 的整个要点是在另一个线程中调用同步函数 . 如果你给它一个协同程序函数,它会愉快地调用它(在另一个线程中)并将返回的协同程序对象提供为"result" . 这相当于将生成器传递给需要普通函数的代码 - 是的,它会调用生成器就好了,但它不知道如何处理返回的对象 .

    更重要的是,你不能只将 async 放在 def 之前,并期望得到一个可用的协程 . 除了等待其他异步代码之外,协程不能阻止 .

    现在,一旦你有一个可用的 request_async ,你就可以收集它的结果:

    async def main():
        tasks = [request_async() for _i in range(10)]
        results = await asyncio.gather(*tasks)
        return results
    
    results = loop.run_until_complete(main())
    

相关问题