首页 文章

使用aiohttp / asyncio进行1百万请求 - 字面意思

提问于
浏览
5

我跟进了这个教程:https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html当我做了50 000个请求时,一切正常 . 但我需要进行1百万个API调用,然后我对此代码有问题:

url = "http://some_url.com/?id={}"
    tasks = set()

    sem = asyncio.Semaphore(MAX_SIM_CONNS)
    for i in range(1, LAST_ID + 1):
        task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
        tasks.add(task)

    responses = asyncio.gather(*tasks)
    return await responses

因为Python需要创建100万个任务,所以它基本上只是滞后,然后在终端中输出 Killed 消息 . 是否有任何方法可以使用预先制作的(或列表)网址的发生器?谢谢 .

2 回答

  • 2

    asyncio是内存绑定(与任何其他程序一样) . 你不能产生更多内存可以容纳的任务 . 我的猜测是你达到了内存限制 . 检查dmesg以获取更多信息 .

    1百万RPS并不意味着有1M个任务 . 任务可以在同一秒内执行多个请求 .

  • 1

    一次安排所有100万个任务

    这是您正在谈论的代码 . 它最多需要3 GB的RAM,因此如果您的可用内存很少,很容易被操作系统终止 .

    import asyncio
    from aiohttp import ClientSession
    
    MAX_SIM_CONNS = 50
    LAST_ID = 10**6
    
    async def fetch(url, session):
        async with session.get(url) as response:
            return await response.read()
    
    async def bound_fetch(sem, url, session):
        async with sem:
            await fetch(url, session)
    
    async def fetch_all():
        url = "http://localhost:8080/?id={}"
        tasks = set()
        async with ClientSession() as session:
            sem = asyncio.Semaphore(MAX_SIM_CONNS)
            for i in range(1, LAST_ID + 1):
                task = asyncio.create_task(bound_fetch(sem, url.format(i), session))
                tasks.add(task)
            return await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        asyncio.run(fetch_all())
    

    使用队列简化工作

    这是我的建议如何使用asyncio.Queue将URL传递给工作人员任务 . 队列根据需要填充,没有预先制定的URL列表 .

    它只需30 MB RAM :)

    import asyncio
    from aiohttp import ClientSession
    
    MAX_SIM_CONNS = 50
    LAST_ID = 10**6
    
    async def fetch(url, session):
        async with session.get(url) as response:
            return await response.read()
    
    async def fetch_worker(url_queue):
        async with ClientSession() as session:
            while True:
                url = await url_queue.get()
                try:
                    if url is None:
                        # all work is done
                        return
                    response = await fetch(url, session)
                    # ...do something with the response
                finally:
                    url_queue.task_done()
                    # calling task_done() is necessary for the url_queue.join() to work correctly
    
    async def fetch_all():
        url = "http://localhost:8080/?id={}"
        url_queue = asyncio.Queue(maxsize=100)
        worker_tasks = []
        for i in range(MAX_SIM_CONNS):
            wt = asyncio.create_task(fetch_worker(url_queue))
            worker_tasks.append(wt)
        for i in range(1, LAST_ID + 1):
            await url_queue.put(url.format(i))
        for i in range(MAX_SIM_CONNS):
            # tell the workers that the work is done
            await url_queue.put(None)
        await url_queue.join()
        await asyncio.gather(*worker_tasks)
    
    if __name__ == '__main__':
        asyncio.run(fetch_all())
    

相关问题