首页 文章

从流中产生的正确方法是什么?

提问于
浏览
24

我有一个 Connection 对象,用于包含 asyncio 连接的读写流:

class Connection(object):

    def __init__(self, stream_in, stream_out):
        object.__init__(self)

        self.__in = stream_in
        self.__out = stream_out

    def read(self, n_bytes : int = -1):
        return self.__in.read(n_bytes)

    def write(self, bytes_ : bytes):
        self.__out.write(bytes_)
        yield from self.__out.drain()

在服务器端, connected 每次客户端连接时都会创建一个 Connection 对象,然后读取4个字节 .

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

在客户端,写出4个字节 .

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

这几乎按预期工作,但我必须 yield fromreadwrite 调用 . 我在 Connection 内尝试了 yield from

def read(self, n_bytes : int = -1):
    data = yield from self.__in.read(n_bytes)
    return data

但我得到的输出不是获取数据,而是获取数据

<generator object StreamReader.read at 0x1109983b8>

如果我从多个地方拨打 readwrite ,我宁愿不每次都重复 yield from ;而是将它们放在 Connection 中 . 我的最终目标是减少我的 new_conection 功能:

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))

2 回答

  • 5

    因为StreamReader.read is a coroutine,你调用它的唯一选择是a)将它包装在TaskFuture中并通过事件循环运行它,b) await 使用 async def 定义的协同程序,或c)使用 yield from 从它定义为一个协程用 @asyncio.coroutine 装饰的功能 .

    由于 Connection.read 是从事件循环中调用的(通过协程 new_connection ),因此您无法重用该事件循环来为 StreamReader.readevent loops can't be started while they're already running运行 TaskFuture . 你要么必须stop the event loop(灾难性的,可能无法做到正确)或create a new event loop(凌乱和挫败使用协同程序的目的) . 这些都不可取,所以 Connection.read 需要是一个协程或 async 函数 .

    另外两个选项( async def 协程中的 await@asyncio.coroutine -decorated函数中的 yield from )大多等同 . 唯一的区别是async def and await were added in Python 3.5,因此对于3.4,使用 yield from@asyncio.coroutine 是唯一的选项(协同程序和 asyncio 在3.4之前不存在,因此其他版本无关紧要) . 就个人而言,我更喜欢使用 async defawait ,因为使用 async def 定义协同程序比使用装饰器更清晰,更清晰 .

    简而言之:将 Connection.readnew_connection 作为协同程序(使用装饰器或 async 关键字),并在调用其他协同程序时使用 await (或 yield from )( new_connection 中的 await conn.read(4)Connection.read 中的 await self.__in.read(n_bytes) ) .

  • 1

    我发现第620行的StreamReader source code的一大块实际上是该函数用法的一个完美例子 .

    在我之前的回答中,我忽略了这样一个事实,即 self.__in.read(n_bytes) 不仅是一个协程(我应该知道它是来自 asyncio 模块XD),但它会在线产生结果 . 所以它实际上是一个发电机,你需要从它产生 .

    从源代码中借用这个循环,你的read函数看起来应该是这样的:

    def read(self, n_bytes : int = -1):
        data = bytearray() #or whatever object you are looking for
        while 1:
            block = yield from self.__in.read(n_bytes)
            if not block:
                break
            data += block
        return data
    

    因为 self.__in.read(n_bytes) 是一个生成器,所以你必须继续从它产生,直到它产生一个空结果来表示读取结束 . 现在你的read函数应该返回数据而不是生成器 . 您不必从此版本的 conn.read() 中获益 .

相关问题