首页 文章

是否可以在事件循环已运行时运行asyncio.Server实例

提问于
浏览
2

我试图理解,是否可以在事件循环已经由 run_forever 方法运行时运行 asyncio.Server 实例(当然,从一个单独的线程) . 据我所知,如果循环已经运行,服务器可以通过 loop.run_until_complete(asyncio.start_server(...))await asyncio.start_server(...) 启动 . 第一种方式对我来说是不可接受的,因为循环已经由run_forever方法运行 . 但是我也可以't use the await expression, since I'从"loop area"外面启动它(例如从main方法,它不能被标记为异步,对吧?)

def loop_thread(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()
        print("loop clesed")

class SchedulerTestManager:
    def __init__(self): 
        ...

        self.loop = asyncio.get_event_loop()
        self.servers_loop_thread = threading.Thread(
            target=loop_thread, args=(self.loop, ))
         ...

    def start_test(self):
        self.servers_loop_thread.start()
        return self.servers_loop_thread

    def add_router(self, router):
        r = self.endpoint.add_router(router)
        host = router.ConnectionParameters.Host
        port = router.ConnectionParameters.Port
        srv = TcpServer(host, port)
        server_coro = asyncio.start_server(
            self.handle_connection, self.host, self.port)
        # does not work since add_router is not async
        # self.server = await server_coro
        # does not work, since the loop is already running
        # self.server = self.loop.run_until_complete(server_coro)
        return r


def maind():
   st_manager = SchedulerTestManager()
   thread = st_manager.start_test()
   router = st_manager.add_router(router)

对于cource,最简单的解决方案是在开始测试(运行循环)之前添加所有路由器(服务器) . 但我想尝试实现它,因此可以在测试已经运行时添加路由器 . 我认为 loop.call_sooncall_soon_threadsafe )方法可以帮助我,但似乎不能指出一个协程,而只是一个简单的函数 .

希望我的解释不是很混乱 . 提前致谢!

1 回答

  • 1

    对于在一个线程中执行的事件循环与在其他线程中执行的传统旧的良好线程代码之间的通信,您可以使用janus库 .

    它是一个带有两个接口的队列:异步和线程安全同步 .

    这是用法示例:

    import asyncio
    import janus
    
    loop = asyncio.get_event_loop()
    queue = janus.Queue(loop=loop)
    
    def threaded(sync_q):
        for i in range(100):
            sync_q.put(i)
        sync_q.join()
    
    @asyncio.coroutine
    def async_coro(async_q):
        for i in range(100):
            val = yield from async_q.get()
            assert val == i
            async_q.task_done()
    
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    loop.run_until_complete(async_coro(queue.async_q))
    loop.run_until_complete(fut)
    

    您可以创建一个任务,在循环中等待队列中的新消息,并根据请求启动新服务器 . 其他线程可能会将新消息推入队列,要求新服务器 .

相关问题