首页 文章

如何将FileTransport干净地添加到Asyncio?

提问于
浏览
0

我'm writing an application which reads text data and acts on it. The text data could come from a TCP port, or from a text file (which contains data earlier read from the TCP port and archived). I'在Python 3中编写它,使用asyncio似乎是一个明显的工具 .

使用Streams API open_connection()打开TCP端口并从中读取是很简单的 . 对于输入输出的低层和高层, asyncio 架构具有TransportProtocol的概念 . 因此,我似乎应该实现一个Transport来从文件中读取文本,并将其传递给Protocol . 这将让我保持我的应用程序的其余部分与文本数据是来自TCP端口还是文件分离 .

但是我很难弄清楚如何告诉 asyncio 使用我喜欢的传输 .

  • Streams API open_connection()有一个参数列表,该列表全部与TCP端口传输有关,无法指定不同的传输,更不用说文件路径等参数 .

  • open_connection() 转身并调用loop.create_connection() . 这与TCP端口传输一样专用 . 现在仍然提供不同的运输方式 .

  • loop.create_connection() 的实现从 self._make_ssl_transport()self._make_socket_transport() 获取其传输对象 . 这些在 asyncio.selector_events.BaseSelectorEventLoopasyncio.proactor_events.BaseProactorEventLoop 中有替代实现,所以我们显然已经超过了应该选择文件传输的程度 .

我错过了一些 asyncio 让我告诉它使用什么运输的地方?或者是 asyncio 真正编码到其根源使用自己的TCP端口和UDP数据报传输,没有别的?

如果我想允许使用我自己的Transport与 asyncio 的可能性,看起来我必须扩展事件循环,或者编写更多灵活的替代 create_connection() ,它被编码到特定的事件循环实现 . 这似乎很多工作,并且易受实施变化的影响 .

或者,使用Transport处理文件输入是一件愚蠢的事吗?我应该构建我的代码来说:

if (using_tcp_port): await asyncio.open_connection(....) else: completely_different_file_implementation(....)

1 回答

  • 2

    根据 API create_connection()documentation,它需要一个协议并创建一个 streaming 传输,这是一个TCP连接 . 所以它不应该是自定义传输的API .

    但是,为TCP传输或自定义文件传输重用相同协议的想法是有效的 . 它不会是"completely different implementation",但至少不使用 create_connection() . 我们假设它是 read_file()

    def my_protocol_factory():
        return your_protocol
    
    if using_tcp_port:
        transport, protocol = await loop.create_connection(my_protocol_factory, host, port)
    else:
        transport, protocol = await read_file(loop, my_protocol_factory, path_to_file)
    

    然后你会有这样的事情:

    from asyncio import transports
    
    import aiofiles  # https://github.com/Tinche/aiofiles
    
    
    def read_file(loop, protocol_factory, path):
        protocol = protocol_factory()
        transport = FileTransport(path, loop)
        transport.set_protocol(protocol)
        return transport, protocol
    
    
    class FileTransport(transports.ReadTransport):
        def __init__(self, path, loop):
            super().__init__()
            self._path = path
            self._loop = loop
            self._closing = False
    
        def is_closing(self):
            return self._closing
    
        def close(self):
            self._closing = True
    
        def set_protocol(self, protocol):
            self._protocol = protocol
            self._loop.create_task(self._do_read())
    
        def get_protocol(self):
            return self._protocol
    
        async def _do_read(self):
            try:
                async with aiofiles.open(self._path) as f:
                    self._loop.call_soon(self._protocol.connection_made, self)
                    async for line in f:
                        self._loop.call_soon(self._protocol.data_received, line)
                        if self._closing:
                            break
                    self._loop.call_soon(self._protocol.eof_received)
            except Exception as ex:
                self._loop.call_soon(self._protocol.connection_lost, ex)
            else:
                self._loop.call_soon(self._protocol.connection_lost, None)
    

相关问题