我正在使用aiohttp将一些同步代码移动到asyncio . 同步代码需要15分钟才能运行,所以我希望能够改进这一点 .
我有一些工作代码从一些网址获取数据并返回每个网址的主体 . 但这只是针对1个实验室网站,我有70个实际网站 .
因此,如果我有一个循环来创建所有网站的所有网址列表,这些网址将在列表中处理700个网址 . 现在处理它们我不认为是一个问题?
但对结果做'东西',我不知道如何编程?我已经有了代码,它会对返回的每个结果做“填充”,但我不确定如何针对正确的结果类型进行编程 .
代码运行时是否会处理所有网址并根据运行时间返回未知顺序?
我是否需要一个能处理任何类型结果的函数?
import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup
def page_content(page):
return BeautifulSoup(page, 'html.parser')
async def fetch(session, url):
with aiohttp.Timeout(15, loop=session.loop):
async with session.get(url) as response:
return page_content(await response.text())
async def get_url_data(urls, username, password):
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
for i in urls:
task = asyncio.ensure_future(fetch(session, i))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
for i in responses:
print(i.title.text)
return responses
def main():
username = 'monitoring'
password = '*********'
ip = '10.10.10.2'
urls = [
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'),
]
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_url_data(urls,username,password))
data = loop.run_until_complete(future)
print(data)
if __name__ == "__main__":
main()
2 回答
这是concurrent.futures.ProcessPoolExecutor的示例 . 如果在未指定
max_workers
的情况下创建它,则实现将使用os.cpu_count
. 另请注意,asyncio.wrap_future是公开的,但没有记录 . 或者,有AbstractEventLoop.run_in_executor .您的代码离标记不远 .
asyncio.gather
按参数的顺序返回结果,因此在此处保留顺序,但不会按顺序调用page_content
.一些调整:
首先,你不需要
ensure_future
. 只有当你试图使一个协程比其父节点更长时才需要创建一个任务,即如果任务必须继续运行,即使创建它的函数已经完成 . 这里你需要的是直接用你的协同程序调用asyncio.gather
:But 调用它会同时安排所有提取,并且有大量的URL,这远非最佳 . 相反,您应该选择最大并发性,并确保最多X次提取随时都在运行 . 要实现这一点,你可以使用
asyncio.Semaphore(20)
,这个信号量只能被最多20个协同程序获取,所以其他人将等待获取直到有一个地点可用 .这样,所有提取都会立即启动,但只有20个能够获取信号量 . 其他人将在第一个
async with
指令处阻塞,并等待另一次提取完成 .我也用这里的官方asyncio替换了aiohttp.Timeout .
最后,对于数据的实际处理,如果受到CPU时间的限制,asyncio可能对您没有多大帮助 . 您需要在此处使用
ProcessPoolExecutor
将实际工作与另一个CPU并行化 .run_in_executor
可能会有用 .