首页 文章

如果事件循环已在运行,如何在方法内等待协程同步完成?

提问于
浏览
2

我正在尝试创建一个基于Python的CLI,它通过websockets与Web服务进行通信 . 我遇到的一个问题是CLI对Web服务的请求间歇性地无法处理 . 查看来自Web服务的日志,我可以看到问题是由于这些请求经常在套接字关闭的同时(甚至之后)发生的事实引起的:

2016-09-13 13:28:10,930 [22 ] INFO  DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN  DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5  ] DEBUG DeviceBridge - Device bridge has closed

在我的CLI中,我定义了一个类 CommunicationService ,它负责处理与Web服务的所有直接通信 . 在内部,它使用websockets包来处理通信,它本身 Build 在 asyncio 之上 .

CommunicationService 包含以下发送请求的方法:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))
    asyncio.ensure_future(self._ws.send(request))

...其中 ws 是另一个方法中先前打开的websocket:

self._ws = await websockets.connect(websocket_address)

我想要的是能够等待 asyncio.ensure_future 返回的未来,并在必要时暂停一会儿,以便在websocket关闭之前给Web服务时间处理请求 .

但是,由于 send_request 是一种同步方法,它不能简单地用于这些期货 . 使它异步是没有意义的,因为没有什么可以等待它返回的协程对象 . 我也不能使用 loop.run_until_complete ,因为循环在调用时已经在运行 .

我发现有人描述的问题与我在mail.python.org的问题非常相似 . 在该线程中发布的解决方案是在循环已经运行的情况下使函数返回coroutine对象:

def aio_map(coro, iterable, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    coroutines = map(coro, iterable)
    coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)

    if loop.is_running():
        return coros
    else:
        return loop.run_until_complete(coros)

这对我来说是不可能的,因为我正在使用PyRx(反应框架的Python实现)并且 send_request 仅被称为Rx observable的订阅者,这意味着返回值被丢弃并且不可用于我的代码:

class AnonymousObserver(ObserverBase):
    ...
    def _on_next_core(self, value):
        self._next(value)

另一方面,我不确定这是否是某种问题 asyncio 只是没有得到它,但我发现使用起来非常令人沮丧 . 在C#中(例如),我需要做的就是如下所示:

void SendRequest(string request)
{
    this.ws.Send(request).Wait();
    // Task.Delay(500).Wait();  // Uncomment If necessary
}

同时,被迫放弃了.1444916_ 's version of 1444917 unhelpfully just returns another coroutine that I'米 .

Update

我找到了解决这个问题的方法似乎有效 . 我有一个异步回调,在命令执行后和CLI终止之前执行,所以我只是从这个改变了...

async def after_command():
    await comms.stop()

......对此:

async def after_command():
    await asyncio.sleep(0.25)  # Allow time for communication
    await comms.stop()

不过,我仍然乐意接受这个问题的任何答案以供将来参考 . 在其他情况下,我可能无法依赖这样的解决方法,我仍然认为在 send_request 内执行延迟会更好的做法,以便 CommunicationService 的客户端不必担心时间问题 .

关于文森特的问题:

你的循环是在另一个线程中运行,还是由某些回调调用send_request?

一切都在同一个线程中运行 - 它是异步的,它们不会通过调用CLI顶层的 loop.run_until_complete 来执行 - 这意味着循环在它们执行中途的时候运行并且正在运行请求(通过间接调用 send_request ) .

Update 2

这是基于Vincent建议添加“完成”回调的解决方案 .

将新的布尔字段 _busy 添加到 CommunicationService 以表示是否正在发生通信活动 .

CommunicationService.send_request 被修改为在发送请求之前设置 _busy 为true,然后在完成后提供回调 _ws.send 以重置 _busy

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))

    def callback(_):
        self._busy = False

    self._busy = True
    asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)

CommunicationService.stop 现在已实现为等待此标志在进行之前设置为false:

async def stop(self) -> None:
    """
    Terminate communications with TestCube Web Service.
    """
    if self._listen_task is None or self._ws is None:
        return

    # Wait for comms activity to stop.
    while self._busy:
        await asyncio.sleep(0.1)

    # Allow short delay after final request is processed.
    await asyncio.sleep(0.1)

    self._listen_task.cancel()
    await asyncio.wait([self._listen_task, self._ws.close()])

    self._listen_task = None
    self._ws = None
    logger.info('Terminated connection to TestCube Web Service')

这似乎也有效,至少这种方式所有通信时序逻辑都被封装在 CommunicationService 类中 .

Update 3

基于Vincent提案的更好的解决方案 .

而不是 self._busy ,我们有 self._send_request_tasks = [] .

新的 send_request 实施:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))

    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.append(task)

新的 stop 实施:

async def stop(self) -> None:
    if self._listen_task is None or self._ws is None:
        return

    # Wait for comms activity to stop.
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)
    ...

2 回答

  • 2

    您可以使用 set 任务:

    self._send_request_tasks = set()
    

    使用 ensure_future 安排任务并使用 add_done_callback 进行清理:

    def send_request(self, request: str) -> None:
        task = asyncio.ensure_future(self._ws.send(request))
        self._send_request_tasks.add(task)
        task.add_done_callback(self._send_request_tasks.remove)
    

    等待 set 任务完成:

    async def stop(self):
        if self._send_request_tasks:
            await asyncio.wait(self._send_request_tasks)
    
  • 1

    鉴于您不在异步函数中,您可以使用 yield from 关键字自己有效地实现 await . 以下代码将阻止,直到将来返回:

    def send_request(self, request: str) -> None:
        logger.debug('Sending request: {}'.format(request))
        future = asyncio.ensure_future(self._ws.send(request))
        yield from future.__await__()
    

相关问题