首页 文章

不确定如何取消长时间运行的asyncio任务

提问于
浏览
2

我正在开发一个与Web服务交互的CLI . 运行时,它将尝试与之 Build 通信,发送请求,接收和处理回复,然后终止 . 我在代码的各个部分使用协同程序和asyncio来驱动它们 . 我想要的是能够执行所有这些步骤,然后让所有协同程序在最后彻底终止(即以不会导致asyncio抱怨的方式) . 不幸的是,我发现asyncio比C#等其他语言的异步性更难以使用和理解 .

我定义了一个类,它通过websocket连接处理与Web服务的所有直接通信:

class CommunicationService(object):
    def __init__(self):
        self._ws = None
        self._listen_task = None
        self._loop = asyncio.get_event_loop()

    ...

    async def _listen_for_responses(self):
        logging.debug('Listening for responses')
        while True:
            message = await self._ws.recv()
            self.__received_response.on_next(message)

    def establish_communication(self, hostname: str, port: int) -> None:
        websocket_address = 'ws://{}:{}/'.format(hostname, port)

        self._ws = self._loop.run_until_complete(websockets.connect(websocket_address))
        self._listen_task = asyncio.ensure_future(self._listen_for_responses())

    def send_request(self, request: str) -> None:
        return asyncio.ensure_future(self._ws.send(request))

    def stop(self):
        if self._listen_task:
            self._loop.call_soon_threadsafe(self._listen_task.cancel)

        if self._ws and self._ws.open:
            self._ws.close()

该类使用websocketsRxPY库 . Build 通信时,此类的实例将创建一个无限期运行的任务,该任务将等待来自Web服务的响应并将其发布到RxPY主题上 .

我在主CLI方法中运行 CommunicationService.establish_communication

def cli(context, hostname, port):
    log_level = context.meta['click_log.core.logger']['level']
    _initialize_logging(log_level)

    # Create a new event loop for processing commands asynchronously on.
    loop = asyncio.new_event_loop()
    loop.set_debug(log_level == logging.DEBUG)
    asyncio.set_event_loop(loop)

    # Establish communication with TestCube Web Service.
    context.comms = CommunicationService()
    context.comms.establish_communication(hostname, port)
    ...

根据提供的CLI参数,这可能会调用子命令回调,我将其作为协程函数实现 .

然后我注册一个函数来处理被调用的子命令的结果,这些子命令将是 None 或一个协程对象:

@cli.resultcallback()
@click.pass_context
def _handle_command_task(context, task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        context.comms.stop()
        loop.close()
        if result:
            print(result, end='')

我的程序有效,但我得到以下输出(在INFO日志级别运行CLI时):

$ testcube relays.0.enabled false
2016-09-06 12:33:51,157 [INFO    ] testcube.comms - Establishing connection to TestCube Web Service @ 127.0.0.1:36364
2016-09-06 12:33:51,219 [ERROR   ] asyncio - Task was destroyed but it is pending!
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future pending cb=[Task._wakeup()]>>
2016-09-06 12:33:51,219 [ERROR   ] asyncio - Task was destroyed but it is pending!
task: <Task pending coro=<WebSocketCommonProtocol.run() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:413> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414]>
2016-09-06 12:33:51,219 [ERROR   ] asyncio - Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414]>
Exception ignored in: <generator object Queue.get at 0x03643600>
Traceback (most recent call last):
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py", line 170, in get
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 227, in cancel
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 242, in _schedule_callbacks
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 497, in call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 506, in _call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

如果我改变 CommunicationService.stop() 直接取消响应侦听任务(而不是安排它)...

self._listen_task.cancel()
#self._loop.call_soon_threadsafe(self._listen_task.cancel)

我得到以下输出:

...
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future cancelled>>
...

其中 wait_for<Future cancelled> (而不是 wait_for=<Future pending cb=[Task._wakeup()]>> ) . 我不明白我如何调用 Task.cancel() 并且它说 future cancelled 但任务仍在等待中 . 我是否需要对该任务执行特殊操作,例如将代码包装在 try...except asyncio.CancelledException... 中?

如果它有用,那么这是同一命令的DEBUG级输出:

$ testcube -v DEBUG relays.0.enabled false
2016-09-06 12:48:10,145 [DEBUG   ] asyncio - Using selector: SelectSelector
2016-09-06 12:48:10,147 [DEBUG   ] Rx - CurrentThreadScheduler.schedule(state=None)
2016-09-06 12:48:10,147 [INFO    ] testcube.comms - Establishing connection to TestCube Web Service @ 127.0.0.1:36364
2016-09-06 12:48:10,153 [DEBUG   ] asyncio - connect <socket.socket fd=608, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6> to ('127.0.0.1', 36364)
2016-09-06 12:48:10,156 [DEBUG   ] asyncio - poll took 0.000 ms: 1 events
2016-09-06 12:48:10,163 [DEBUG   ] asyncio - <socket.socket fd=608, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 56647), raddr=('127.0.0.1', 36364)> connected to 127.0.0.1:36364: (<_SelectorSocketTransport fd=608 read=polling write=<idle, bufsize=0>>, <websockets.client.WebSocketClientProtocol object at 0x03623BF0>)
2016-09-06 12:48:10,198 [DEBUG   ] asyncio - poll took 31.000 ms: 1 events
2016-09-06 12:48:10,202 [DEBUG   ] root - Connected using websocket address: ws://127.0.0.1:36364/
2016-09-06 12:48:10,202 [DEBUG   ] Rx - CurrentThreadScheduler.schedule(state=None)
2016-09-06 12:48:10,203 [DEBUG   ] testcube.components.core - Using write handler
2016-09-06 12:48:10,203 [DEBUG   ] root - Listening for responses
2016-09-06 12:48:10,205 [DEBUG   ] testcube.comms - Sending request: {"op": "replace", "value": false, "path": "testcube.relays[0].enabled"}
2016-09-06 12:48:10,208 [DEBUG   ] websockets.protocol - client >> Frame(fin=True, opcode=1, data=b'{"op": "replace", "value": false, "path": "testcube.relays[0].enabled"}')
2016-09-06 12:48:10,209 [DEBUG   ] asyncio - Close <_WindowsSelectorEventLoop running=False closed=False debug=True>
2016-09-06 12:48:10,222 [ERROR   ] asyncio - Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module>
    load_entry_point('testcube', 'console_scripts', 'testcube')()
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main
    cli(default_map=_get_default_settings())
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main
    rv = self.invoke(ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1057, in invoke
    Command.invoke(self, ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 889, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 168, in cli
    context.comms.establish_communication(hostname, port)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 48, in establish_communication
    self._listen_task = asyncio.ensure_future(self._listen_for_responses())
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future cancelled created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> created at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:48>
2016-09-06 12:48:10,223 [ERROR   ] asyncio - Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module>
    load_entry_point('testcube', 'console_scripts', 'testcube')()
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main
    cli(default_map=_get_default_settings())
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main
    rv = self.invoke(ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1057, in invoke
    Command.invoke(self, ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 889, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 168, in cli
    context.comms.establish_communication(hostname, port)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 47, in establish_communication
    self._ws = self._loop.run_until_complete(websockets.connect(websocket_address))
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 375, in run_until_complete
    self.run_forever()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 345, in run_forever
    self._run_once()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 1304, in _run_once
    handle._run()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\events.py", line 125, in _run
    self._callback(*self._args)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\streams.py", line 238, in connection_made
    self._stream_writer)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py", line 633, in client_connected
    self.worker_task = asyncio_ensure_future(self.run(), loop=self.loop)
task: <Task pending coro=<WebSocketCommonProtocol.run() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:413> wait_for=<Future pending cb=[Task._wakeup()] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:633>
2016-09-06 12:48:10,223 [ERROR   ] asyncio - Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module>
    load_entry_point('testcube', 'console_scripts', 'testcube')()
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main
    cli(default_map=_get_default_settings())
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main
    rv = self.invoke(ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1060, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1025, in _process_result
    **ctx.params)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 190, in _handle_command_task
    result = loop.run_until_complete(task)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 375, in run_until_complete
    self.run_forever()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 345, in run_forever
    self._run_once()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 1304, in _run_once
    handle._run()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\events.py", line 125, in _run
    self._callback(*self._args)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py", line 239, in _step
    result = coro.send(None)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 34, in _listen_for_responses
    message = await self._ws.recv()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py", line 280, in recv
    self.messages.get(), loop=self.loop)
task: <Task pending coro=<Queue.get() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py:168> wait_for=<Future pending cb=[Task._wakeup()] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:280>
Exception ignored in: <generator object Queue.get at 0x03641240>
Traceback (most recent call last):
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py", line 170, in get
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 227, in cancel
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 242, in _schedule_callbacks
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 497, in call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 506, in _call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

1 回答

  • 1

    我想通了 - 我需要定义 CommunicationService.stop() 如下:

    def stop(self):
        if self._listen_task is None or self._ws is None:
            return
    
        self._listen_task.cancel()
        self._loop.run_until_complete(asyncio.wait([self._listen_task, self._ws.close()]))
    
        self._listen_task = None
        self._ws = None
    

    作为可能最终遇到相关问题的其他任何人的文档,现在完整的清理代码:

    @cli.resultcallback()
    @click.pass_context
    def _handle_command_task(context, task: Coroutine, **_) -> None:
        if task:
            loop = asyncio.get_event_loop()
            result = loop.run_until_complete(task)
            context.comms.stop()
            loop.close()
            if result:
                print(result, end='')
    

相关问题