我希望在编写日志文件时阅读它们并使用asyncio处理它们的输入 . 代码必须在Windows上运行 . 根据我在stackoverflow和web上搜索的理解,异步文件I / O在大多数操作系统上都很棘手(例如, select
将无法正常工作) . 虽然我确信我可以用其他方法(例如线程)做到这一点,但我会尝试使用asyncio来查看它是什么样的 . 最有用的答案可能是描述这个问题的解决方案应该是什么样子的答案,即应该如何调用或调度不同的函数和协同程序 .
下面给我一个生成器,逐行读取文件(通过轮询,这是可以接受的):
import time
def line_reader(f):
while True:
line = f.readline()
if not line:
time.sleep(POLL_INTERVAL)
continue
process_line(line)
有几个要监视和处理的文件,这种代码需要线程 . 我稍微修改了它以便更适用于asyncio:
import asyncio
def line_reader(f):
while True:
line = f.readline()
if not line:
yield from asyncio.sleep(POLL_INTERVAL)
continue
process_line(line)
当我通过asyncio事件循环安排它时,这种工作,但如果 process_data
块,那当然不好 . 在开始时,我想象解决方案看起来像
def process_data():
...
while True:
...
line = yield from line_reader()
...
但我无法弄清楚如何使这项工作(至少没有 process_data
管理相当多的状态) .
关于如何构建这种代码的任何想法?
4 回答
您的代码结构对我来说很好,以下代码在我的机器上运行正常:
使用aiofiles:
EDIT 1
正如@Jashandeep所提到的,你应该关心阻塞操作:
另一种方法是
select
和/或epoll
:timeout
参数在这里很重要 .见:https://docs.python.org/3/library/select.html#select.select
EDIT 2
您可以使用以下命令注册文件以进行读/写:loop.add_reader()
它在循环内部使用内部EPOLL处理程序 .
asyncio
还不支持文件操作,抱歉 .因此它无法解决您的问题 .
asyncio
是基于* nix系统的select
,因此如果不使用线程,您将无法执行非阻塞文件I / O.在Windows上,asyncio
可以使用IOCP,它支持非阻塞文件I / O,但asyncio
不支持此功能 .你的代码很好,除了你应该在线程中阻止I / O调用,这样你就不会使用
loop.run_in_executor
函数将工作卸载到线程上 .首先,为您的I / O设置一个专用的线程池:
然后只需卸载对执行程序的任何阻塞I / O调用: