首页 文章

如何在单元测试结束前取消asyncio Task?

提问于
浏览
1

我正在尝试对异步套接字服务器进行单元测试,并使用 pytest-asyncio 使pytest与异步代码库兼容 . 服务器一旦启动,总是通过while循环发送回复,并且可能花费大部分时间等待 client_loop() 中的传入消息 . 问题是在单元测试框架终止事件循环并发出此警告之前,无法取消此任务:

任务被破坏但尚待批准!任务:<任务挂起coro = <Server.new_client()完成,定义在/ [... path ...] / server.py:16> wait_for = <Future pending cb = [<TaskWakeupMethWrapper object at 0x106d7cbe8>() ] >>

我可以访问的唯一任务是 asyncio.create_task() 创建的任务,这似乎不是同一个任务 . 那个任务看起来像这样:

task:<任务挂起coro = <start_server()运行在/usr/local/Cellar/python/[...different path ...] / streams.py:86 >>

因此,在此任务上调用 task.cancel(); await task.wait_cancelled() 无效 .

如何编写单元测试以便为每个测试干净地启动和启动服务器,而不是切断可能仍在运行的任务?

这是一个例子:

test_server.py

import pytest
import asyncio

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    the_server.stop()

@pytest.mark.asyncio
async def test_connect(server):
    loop = asyncio.get_event_loop()
    reader, writer = await asyncio.open_connection('0.0.0.0', 8888, loop = loop)
    writer.write(b'something')
    await reader.read(100)
    writer.write(b'something else')
    await reader.read(100)
    assert 1

server.py

import asyncio

class Server():
    async def start(self):
        loop = asyncio.get_event_loop()
        coro = asyncio.start_server(self.new_client, '0.0.0.0', 8888, loop = loop)
        task = loop.create_task(coro)
        print('\n')
        print(task)
        self.server = await task

    def stop(self):
        self.server.close()

    async def new_client(self, reader, writer):
        await self.client_loop(reader, writer)

    async def client_loop(self, reader, writer):
        while True:
            await reader.read(100)
            writer.write(b'reply')

如果你想运行这个例子,只需运行 pip3 install pytest-asyncio 并且pytest可以选择这个插件 .

2 回答

  • 0

    调用 self.server.close() 后,你必须 await self.server.wait_closed() .

    因此,您的灯具应如下所示:

    @pytest.fixture
    async def server(event_loop):
        from server import Server
        the_server = Server()
        await the_server.start()
        yield the_server
        await the_server.stop()
    

    Serverstop 方法应该如下所示:

    async def stop(self):
            self.server.close()
            await self.server.wait_closed()
    

    有关详细信息,请参阅the documentation .

  • 4

    asyncio.Server.stop() 方法不会完全停止服务器 . 它只是停止接受新的连接 . 关闭前创建的任何连接将继续执行直到完成 .

    根据the documentation(强调我的):

    停止服务:关闭侦听套接字并将套接字属性设置为None . 表示现有传入客户端连接的套接字保持打开状态 . 服务器异步关闭,使用wait_closed()协程等待服务器关闭 .

    在此示例中,所有连接都发送到无限 client_loop 方法 .

    更好的解决方案是在 new_client() 中创建一个任务集合,负责执行 client_loop() 逻辑而不是直接等待该方法 . 使用此方法,可以在 stop() 方法中彻底终止所有打开的任务 .

相关问题