首页 文章

将作业提交到asyncio事件循环

提问于
浏览
7

我想将一个线程中的作业提交到 asyncio 事件循环(就像run_in_executor,但反之亦然) .

以下是 asyncio 文档中关于concurrency and multithreading的内容:

要从其他线程安排回调,应使用BaseEventLoop.call_soon_threadsafe()方法 . 从不同的线程调度协程的示例:loop.call_soon_threadsafe(asyncio.async,coro_func())

这工作正常,但协同程序的结果丢失了 .

相反,可以使用一个函数将完成的回调添加到 async (或 ensure_future )返回的未来,以便线程可以通过concurrent.futures.Future访问结果 .

是否有特殊原因导致标准库中未实现此类功能?或者我错过了一种更简单的方法来实现这一目标?

1 回答

  • 6

    我的请求已经完成,run_coroutine_threadsafe函数已经实现here .

    例:

    def target(loop, timeout=None):
        future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
        return future.result(timeout)
    
    async def add(a, b):
        await asyncio.sleep(1)
        return a + b
    
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None, target, loop)
    assert loop.run_until_complete(future) == 3
    

    我最初发布了一个concurrent.futures.Executor的子类,仍然可以实现为:

    class LoopExecutor(concurrent.futures.Executor):
        """An Executor subclass that uses an event loop 
        to execute calls asynchronously."""
    
        def __init__(self, loop=None):
            """Initialize the executor with a given loop."""
            self.loop = loop or asyncio.get_event_loop()
    
        def submit(self, fn, *args, **kwargs):
            """Schedule the callable, fn, to be executed as fn(*args **kwargs).
            Return a Future object representing the execution of the callable."""
            coro = asyncio.coroutine(fn)(*args, **kwargs)
            return asyncio.run_coroutine_threadsafe(coro, self.loop)
    

相关问题