首页 文章

Python3 Asyncio从初始连接创建辅助连接

提问于
浏览
0

我试图添加两个协同程序到asyncio循环并得到一个错误:

RuntimeError:此事件循环已在运行

我的目标是与服务器通信(我无法控制) . 此服务器需要来自客户端的初始连接 . 然后,服务器在此连接上为客户端提供了一个端口 . 客户端必须使用此端口来创建第二个连接 . 服务器使用第二个连接将未经请求的消息发送到客户端 . 对于其他双向通信,第一个连接始终保持不变 .

要重新创建此方案,我有一些代码可以重现错误:

class Connection():
    def __init__(self, ip, port, ioloop):
        self.ip = ip
        self.port = port
        self.ioloop = ioloop
        self.reader, self.writer = None, None
        self.protocol = None
        self.fileno = None

    async def __aenter__(self):
        # Applicable when doing 'with Connection(...'
        log.info("Entering and Creating Connection")
        self.reader, self.writer = (
            await asyncio.open_connection(self.ip, self.port, loop=self.ioloop)
        )
        self.protocol = self.writer.transport.get_protocol()
        self.fileno = self.writer.transport.get_extra_info('socket').fileno()

        log.info(f"Created connection {self}")
        return self

    async def __aexit__(self, *args):
        # Applicable when doing 'with Connection(...'
        log.info(f"Exiting and Destroying Connection {self}")
        if self.writer:
            self.writer.close()

    def __await__(self):
        # Applicable when doing 'await Connection(...'
        return self.__aenter__().__await__()

    def __repr__(self):
        return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]"

    async def send_recv_message(self, message):
        log.debug(f"send: '{message}'")
        self.writer.write(message.encode())
        await self.writer.drain()

        log.debug("awaiting data...")
        data = await self.reader.read(9999)
        data = data.decode()
        log.debug(f"recv: '{data}'")
        return data


class ServerConnection(Connection):
    async def setup_connection(self):
        event_port = 8889  # Assume this came from the server
        print("In setup connection")
        event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
        self.ioloop.run_until_complete(event_connection.recv_message())

class EventConnection(Connection):
    async def recv_message(self):
        log.debug("awaiting recv-only data...")
        data = await self.reader.read(9999)
        data = data.decode()
        log.debug(f"recv only: '{data}'")
        return data


async def main(loop):
    client1 = await ServerConnection('127.0.0.1', 8888, loop)
    await client1.setup_connection()
    await client1.send_recv_message("Hello1")
    await client1.send_recv_message("Hello2")
    await asyncio.sleep(5)

if __name__ == '__main__':
    #logging.basicConfig(level=logging.INFO)
    logging.basicConfig(level=logging.DEBUG)
    log = logging.getLogger()
    ioloop = asyncio.get_event_loop()
    print('starting loop')
    ioloop.run_until_complete(main(ioloop))
    print('completed loop')
    ioloop.close()

在调用run_until_complete的ServerConnection.setup_connection()方法中发生错误 .

由于缺乏对asyncio的理解,我可能做错了 . 基本上,如何设置辅助连接,在设置第一个连接时会收到事件通知(未经请求)?

谢谢 .

Followup

由于代码非常相似(为了添加更多功能而进行了一些更改),我希望跟进原始帖子并不是很糟糕,因为产生的错误仍然是相同的 .

新问题是当它收到未经请求的消息(由EventConnection接收)时,recv_message调用process_data方法 . 我想让process_data成为未来,以便recv_message完成(ioloop应该停止) . 然后,ensure_future将获取它并继续再次运行以使用ServerConnection对服务器执行请求/响应 . 在此之前,它必须转到一些用户代码(由external_command()表示,我希望从中隐藏异步内容) . 这将使它再次同步 . 因此,一旦他们完成了他们需要的工作,他们应该在ServerConnection上调用execute_command,然后再次启动循环 .

问题是,我对使用ensure_future的期望没有消失,因为看起来循环没有停止运行 . 因此,当代码执行到达执行run_until_complete的execute_command时,会发生错误“此事件循环已在运行”的异常 .

我有两个问题:

  • 如何使ioloop在process_data放入ensure_future之后停止,然后能够在execute_command中再次运行?

  • 一旦recv_message收到了什么,我们怎样才能使它能够接收更多未经请求的数据?仅使用ensure_future再次调用自己是否足够/安全?

这是模拟此问题的示例代码 .

client1 = None

class ServerConnection(Connection):
    connection_type = 'Server Connection'
    async def setup_connection(self):
        event_port = 8889  # Assume this came from the server
        print("In setup connection")
        event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
        asyncio.ensure_future(event_connection.recv_message())

    async def _execute_command(self, data):
        return await self.send_recv_message(data)

    def execute_command(self, data):
        response_str = self.ioloop.run_until_complete(self._execute_command(data))
        print(f"exec cmd response_str: {response_str}")

    def external_command(self, data):
        self.execute_command(data)


class EventConnection(Connection):
    connection_type = 'Event Connection'
    async def recv_message(self):
        global client1
        log.debug("awaiting recv-only data...")
        data = await self.reader.read(9999)
        data = data.decode()
        log.debug(f"recv-only: '{data}'")
        asyncio.ensure_future(self.process_data(data))
        asyncio.ensure_future(self.recv_message())

    async def process_data(self, data):
        global client1
        await client1.external_command(data)


async def main(ioloop):
    global client1
    client1 = await ServerConnection('127.0.0.1', 8888, ioloop)
    await client1.setup_connection()
    print(f"after connection setup loop running is {ioloop.is_running()}")
    await client1.send_recv_message("Hello1")
    print(f"after Hello1 loop running is {ioloop.is_running()}")
    await client1.send_recv_message("Hello2")
    print(f"after Hello2 loop running is {ioloop.is_running()}")
    while True:
        print(f"inside while loop running is {ioloop.is_running()}")
        t = 10
        print(f"asyncio sleep {t} sec")
        await asyncio.sleep(t)


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    log = logging.getLogger()
    ioloop = asyncio.get_event_loop()
    print('starting loop')
    ioloop.run_until_complete(main(ioloop))
    print('completed loop')
    ioloop.close()

1 回答

  • 0

    尝试更换:

    self.ioloop.run_until_complete
    

    await
    

相关问题