首页 文章

从另一个线程调度asyncio协程,没有一堆回调和同步等待

提问于
浏览
3

我不得不要求澄清this问题 .

我有一个发送消息的协程 send . 我想从 loop2 (在线程2中运行)中的 loop1 (在线程1中运行)中安排它:

async def send_threadsafe(self, message, current_loop=loop2, dest_loop=loop1):
    future = asyncio.run_coroutine_threadsafe(
        send(message), loop=dest_loop
    )

asyncio.run_coroutine_threadsafe 返回的 futureconcurrent.futures.Future ,无法异步等待 .

所以问题是:我如何正确地等待 future 和/或我应该如何安排我的 send 来获得一个等待的对象?

我知道我能做到:

async def send_threadsafe(...):
    future = ...
    result = await current_loop.run_in_executor(None, future.result)

但是有没有办法在不使用其他线程的情况下完成它?因为 run_in_executor 会将 future.result 发送到线程池,我不想利用该线程池 .

我不想使用 call_soon_threadsafe 的原因是它需要创建几个回调 . 首先,安排在 loop1 中运行 send . 其次,在 loop1 中运行 send 并在 loop2 中安排第三个回调 . 第三,将结果设置为在第一个回调中创建的未来(因为asyncio期货不是线程安全的,我无法设置 loop1 的结果) .

1 回答

  • 4

    您可以使用 asyncio.wrap_future 从并发的未来获得asyncio的未来:

    async def send_threadsafe(self, message, destination, *, loop=loop):
        concurrent = asyncio.run_coroutine_threadsafe(
            send(message), loop=destination)
        return await asyncio.wrap_future(concurrent, loop=loop)
    

    通过实现asyncio Actuator 可以实现相同的功能:

    from concurrent.futures import Executor
    
    class AsyncioExecutor(Executor):
    
        def __init__(self, loop):
            self.loop = loop
    
        def submit(self, fn, *args, **kwargs):
            coro = fn(*args, **kwargs)
            return asyncio.run_coroutine_threadsafe(coro, self.loop)
    

    例:

    executor = AsyncioExecutor(remote_loop)
    result = await loop.run_in_executor(executor, send, message)
    

相关问题