首页 文章

使用asyncio和aiohttp实现异步程序的最佳架构

提问于
浏览
0

我试图了解如何最好地构建一个程序执行以下操作:

考虑多个分析 . 每个分析都请求来自多个数据源(REST API)的数据 . 在每次分析中,当从数据源收集所有数据时,将检查数据的一个或多个条件 . 如果满足这些条件,则会向另一个数据源发出另一个请求 .

目标是收集所有分析异步的数据,检查每个分析的条件,请求条件是否满足然后重复 . 因此,有以下要求:

  • 检查数据的条件 after 在特定分析中收集所有数据,而不是在所有分析中收集数据之后 .

  • 如果满足条件,则首先进行请求 - 而不是在检查所有分析的条件之后 .

  • 获取数据 - >检查条件 - >可能请求某个循环被安排为每X分钟或几小时运行一次 .

我做了以下演示:

import asyncio
import random


async def get_data(list_of_data_calls):
    tasks = []
    for l in list_of_data_calls:
        tasks.append(asyncio.ensure_future(custom_sleep(l)))
    return await asyncio.gather(*tasks)


async def custom_sleep(time):
    await asyncio.sleep(time)
    return random.randint(0, 100)


async def analysis1_wrapper():
    while True:
        print("Getting data for analysis 1")
        res = await get_data([5, 3])
        print("Data collected for analysis 1")
        for integer in res:
            if integer > 80:
                print("Condition analysis 1 met")
            else:
                print("Condition analysis 1 not met")
        await asyncio.sleep(10)


async def analysis2_wrapper():
    while True:
        print("Getting data for analysis 2")
        res = await get_data([5, 3])
        print("Data collected for analysis 2")
        for integer in res:
            if integer > 50:
                print("Condition analysis 2 met")
            else:
                print("Condition analysis 2 not met")
        await asyncio.sleep(10)


loop = asyncio.get_event_loop()
tasks = analysis1_wrapper(), analysis2_wrapper()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

这会产生以下输出:

Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met

这看起来像我想要的那样 . 但是,由于我对asyncio和aiohttp的经验有限,我不确定这是否是一个很好的方法 . 我希望将来能够在管道中添加步骤,例如如果满足条件,则根据正在执行的请求的逻辑做某事 . 此外,它应该可以扩展到 many 分析,而不会失去太多的速度 .

1 回答

  • 1

    是的,基本上就是这样 . 一些要考虑的事情:

    • 并发限制 .

    尽管您可能拥有无限数量的并发任务,但随着并发性的增加,每个任务的响应时间也会增长,吞吐量会在某个时刻停止增长甚至会减少 . 因为只有一个主线程可以执行所有操作,所以回调必须在它们太多时排队,即使网络响应在几毫秒之前到达 . 要 balancer 这一点,通常需要Semaphore来控制性能的最大并发性 .

    • CPU密集型操作 .

    您的代码没有显示,但担心的是条件检查可能是CPU密集型的 . 在这种情况下,您应该将任务推迟到thread pool(无GIL问题)或子进程(如果GIL是一个问题),原因有两个:1 . 阻止阻止主线程损害并发性 . 2.更有效地利用多个CPU .

    • 任务控制 .

    对于每次分析,您当前的代码在循环中休眠10秒 . 这使得优雅地关闭分析仪变得困难,更不用说动态缩放了 . 理想的模型是producer-consumer模式,在这种模式中,您可以通过某种控制将任务生成为Queue,并且一群工作人员从队列中检索任务并同时处理它们 .

相关问题