我试图了解如何最好地构建一个程序执行以下操作:
考虑多个分析 . 每个分析都请求来自多个数据源(REST API)的数据 . 在每次分析中,当从数据源收集所有数据时,将检查数据的一个或多个条件 . 如果满足这些条件,则会向另一个数据源发出另一个请求 .
目标是收集所有分析异步的数据,检查每个分析的条件,请求条件是否满足然后重复 . 因此,有以下要求:
-
检查数据的条件 after 在特定分析中收集所有数据,而不是在所有分析中收集数据之后 .
-
如果满足条件,则首先进行请求 - 而不是在检查所有分析的条件之后 .
-
获取数据 - >检查条件 - >可能请求某个循环被安排为每X分钟或几小时运行一次 .
我做了以下演示:
import asyncio
import random
async def get_data(list_of_data_calls):
tasks = []
for l in list_of_data_calls:
tasks.append(asyncio.ensure_future(custom_sleep(l)))
return await asyncio.gather(*tasks)
async def custom_sleep(time):
await asyncio.sleep(time)
return random.randint(0, 100)
async def analysis1_wrapper():
while True:
print("Getting data for analysis 1")
res = await get_data([5, 3])
print("Data collected for analysis 1")
for integer in res:
if integer > 80:
print("Condition analysis 1 met")
else:
print("Condition analysis 1 not met")
await asyncio.sleep(10)
async def analysis2_wrapper():
while True:
print("Getting data for analysis 2")
res = await get_data([5, 3])
print("Data collected for analysis 2")
for integer in res:
if integer > 50:
print("Condition analysis 2 met")
else:
print("Condition analysis 2 not met")
await asyncio.sleep(10)
loop = asyncio.get_event_loop()
tasks = analysis1_wrapper(), analysis2_wrapper()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
这会产生以下输出:
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
这看起来像我想要的那样 . 但是,由于我对asyncio和aiohttp的经验有限,我不确定这是否是一个很好的方法 . 我希望将来能够在管道中添加步骤,例如如果满足条件,则根据正在执行的请求的逻辑做某事 . 此外,它应该可以扩展到 many 分析,而不会失去太多的速度 .
1 回答
是的,基本上就是这样 . 一些要考虑的事情:
尽管您可能拥有无限数量的并发任务,但随着并发性的增加,每个任务的响应时间也会增长,吞吐量会在某个时刻停止增长甚至会减少 . 因为只有一个主线程可以执行所有操作,所以回调必须在它们太多时排队,即使网络响应在几毫秒之前到达 . 要 balancer 这一点,通常需要Semaphore来控制性能的最大并发性 .
您的代码没有显示,但担心的是条件检查可能是CPU密集型的 . 在这种情况下,您应该将任务推迟到thread pool(无GIL问题)或子进程(如果GIL是一个问题),原因有两个:1 . 阻止阻止主线程损害并发性 . 2.更有效地利用多个CPU .
对于每次分析,您当前的代码在循环中休眠10秒 . 这使得优雅地关闭分析仪变得困难,更不用说动态缩放了 . 理想的模型是producer-consumer模式,在这种模式中,您可以通过某种控制将任务生成为Queue,并且一群工作人员从队列中检索任务并同时处理它们 .