首页 文章

aiohttp中的超时WebSocket连接

提问于
浏览
1

我的WebSocket服务器实现向全世界开放,但客户端需要在 Build 连接或服务器应关闭连接后发送身份验证消息 .

我怎样才能在aiohttp中实现它?看来,我需要做以下事情:

  • 为每个套接字连接创建一个 on_open 方法:我找不到一种方法(类似于Tornado中的on_open)来创建这样的事件 .

  • 创建计时器:可以使用主事件循环的asyncio的 sleepcall_back 方法 . 但我找不到将WebSocketResponse发送到回调函数的方法:

await asyncio.sleep(10, timer, loop=request.app.loop)

  • 如果未经过身份验证,则关闭连接

这就是我之前使用Tornado的原因:

def open(self, *args, **kwargs):
    self.timeout = ioloop.IOLoop.instance().add_timeout(
        datetime.timedelta(seconds=60),
        self._close_on_timeout
    )

def remove_timeout_timer(self):
    ioloop.IOLoop.instance().remove_timeout(self.timeout)
    self.timeout = None

def on_message(self, message):
    if message = 'AUTHENTICATE':
        self.authenticated = True
        self.remove_timeout_timer

def _close_on_timeout(self):
    if not self.authenticated:
        if self.ws_connection:
            self.close()

以下是我使用aiohttp设置计时器的方法:

async def ensure_client_logged(ws):
    await asyncio.sleep(3)  # wait 3 seconds
    await ws.send_str('hello')

async def ws_handler(request):
    ws = web.WebSocketResponse()

    asyncio.ensure_future(ensure_client_logged(ws), loop=request.app.loop)

但是代码以阻塞的方式运行,这意味着服务器在睡眠时变得没有响应 .

有人可以指点我正确的方向吗?

2 回答

  • 0

    您需要为身份验证过程 Build 截止日期 . asyncio.wait_for是一种方便的方法:

    async def ws_handler(request):
        loop = asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        loop.create_task(handle_client(ws))
    
    async def handle_client(ws):
        try:
            authenticated = await asyncio.wait_for(_authenticate(ws), 10)
        except asyncio.TimeoutError:
            authenticated = False
        if not authenticated:
            ws.close()
            return
        # continue talking to the client
    
    async def _authenticate(ws):
        # implement authentication here, without worrying about
        # timeout - the coroutine will be automatically canceled
        # once the timeout elapses
        ...
        return True  # if successfully authenticated
    
  • 0

    这是未来用户的好处的完整工作示例:

    from aiohttp import web
    import asyncio
    
    async def wait_for_authentication(ws, app):
        async for msg in ws:
            if msg.type == web.WSMsgType.TEXT and msg.data == 'AUTHENTICATE':  # Implement your own authentication
                await ws.send_str('WELCOME')
                return True
            else:
                await ws.send_str('NOT AUTHENTICATED')
    
    
    async def authenticate(ws, app) -> bool:
        try:
            authenticated = await asyncio.wait_for(wait_for_authentication(ws, app), 5)
        except asyncio.TimeoutError:
            authenticated = False
    
        if not authenticated:
            await ws.send_str('The AUTHENTICATE command was not received. Closing the connection...')
            await ws.close()
            return False
    
    
    async def ws_handler(request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)
    
        await request.app.loop.create_task(authenticate(ws, request.app))
    
        async for msg in ws:
            if msg.type != web.WSMsgType.TEXT:
                continue
    
            await ws.send_str(msg.data)
    
    def init():
        app = web.Application()
    
        app.router.add_get('/', ws_handler)
    
        return app
    
    web.run_app(init())
    

相关问题