首页 文章

使用asyncio逐行读取文件

提问于
浏览
14

我希望在编写日志文件时阅读它们并使用asyncio处理它们的输入 . 代码必须在Windows上运行 . 根据我在stackoverflow和web上搜索的理解,异步文件I / O在大多数操作系统上都很棘手(例如, select 将无法正常工作) . 虽然我确信我可以用其他方法(例如线程)做到这一点,但我会尝试使用asyncio来查看它是什么样的 . 最有用的答案可能是描述这个问题的解决方案应该是什么样子的答案,即应该如何调用或调度不同的函数和协同程序 .

下面给我一个生成器,逐行读取文件(通过轮询,这是可以接受的):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)

有几个要监视和处理的文件,这种代码需要线程 . 我稍微修改了它以便更适用于asyncio:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)

当我通过asyncio事件循环安排它时,这种工作,但如果 process_data 块,那当然不好 . 在开始时,我想象解决方案看起来像

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...

但我无法弄清楚如何使这项工作(至少没有 process_data 管理相当多的状态) .

关于如何构建这种代码的任何想法?

4 回答

  • 2

    您的代码结构对我来说很好,以下代码在我的机器上运行正常:

    import asyncio
    
    PERIOD = 0.5
    
    @asyncio.coroutine
    def readline(f):
        while True:
            data = f.readline()
            if data:
                return data
            yield from asyncio.sleep(PERIOD)
    
    @asyncio.coroutine
    def test():
        with open('test.txt') as f:
            while True:
                line = yield from readline(f)
                print('Got: {!r}'.format(line))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())
    
  • 0

    使用aiofiles

    async with aiofiles.open('filename', mode='r') as f:
        async for line in f:
            print(line)
    

    EDIT 1

    正如@Jashandeep所提到的,你应该关心阻塞操作:

    另一种方法是 select 和/或 epoll

    from select import select
    
    files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)
    

    timeout 参数在这里很重要 .

    见:https://docs.python.org/3/library/select.html#select.select

    EDIT 2

    您可以使用以下命令注册文件以进行读/写:loop.add_reader()

    它在循环内部使用内部EPOLL处理程序 .

  • 13

    asyncio 还不支持文件操作,抱歉 .

    因此它无法解决您的问题 .

  • 20

    根据我从stackoverflow和web搜索的理解,异步文件I / O在大多数操作系统上都很棘手(例如,select不会按预期工作) . 虽然我确信我可以用其他方法(例如线程)做到这一点,但我会尝试使用asyncio来查看它是什么样的 .

    asyncio 是基于* nix系统的 select ,因此如果不使用线程,您将无法执行非阻塞文件I / O.在Windows上, asyncio 可以使用IOCP,它支持非阻塞文件I / O,但 asyncio 不支持此功能 .

    你的代码很好,除了你应该在线程中阻止I / O调用,这样你就不会使用 loop.run_in_executor 函数将工作卸载到线程上 .

    首先,为您的I / O设置一个专用的线程池:

    from concurrent.futures import ThreadPoolExecutor
    io_pool_exc = ThreadPoolExecutor()
    

    然后只需卸载对执行程序的任何阻塞I / O调用:

    ...
    line = yield from loop.run_in_executor(io_pool_exc, f.readline)
    ...
    

相关问题