首页 文章

使用asyncio使websocket回调异步

提问于
浏览
3

我正在尝试使用带有Python 3.5.2的asynciowebsockets来实现一个基本的websocket客户端 .

基本上,我希望 connect_to_dealer 是一个阻塞调用,但是在另一个线程上等待websocket消息 .

在阅读了一些文档之后(我用Python进行了很少的exp),我得出的结论是 asyncio.ensure_future() 传递了一个协程( listen_for_message )是要走的路 .

现在,我可以在另一个线程上运行 listen_for_message ,但是从协同程序中我似乎无法使用 await 或任何其他机制来调用 synchronous . 如果我这样做,即使对于一个简单的 sleep ,执行也会永远等待(它会挂起) .

我想知道我做错了什么 .

async def listen_for_message(self, future, websocket):
    while (True):
        try:
            await asyncio.sleep(1) # It hangs here
            print('Listening for a message...')
            message = await websocket.recv() # If I remove the sleep, hangs here
            print("< {}".format(message))
            future.set_result(message)
            future.done()
        except websockets.ConnectionClosed as cc:
            print('Connection closed')
        except Exception as e:
            print('Something happened')

def handle_connect_message(self, future):
    # We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
    print(future.result)

async def connect_to_dealer(self):
    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket'))
    hello_message = await websocket.recv()
    print("< {}".format(hello_message))
    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
    if sub_response.status_code == 200:
        print('Now we\'re observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))
    # Now we need to handle messages but continue with the regular execution
    try:
        future = asyncio.get_event_loop().create_future()
        future.add_done_callback(self.handle_connect_message)
        asyncio.ensure_future(self.listen_for_message(future, websocket))
    except Exception as e:
        print(e)

1 回答

  • 2

    您需要使用明确的期货吗?

    使用 asyncio ,您可以使用 coroutinesTasks 的组合来实现大多数目的 . 任务本质上是包装的协同程序,它们在后台自行完成,独立于其他异步代码,因此您不必显式管理它们的流程或者用其他代码来处理它们 .

    我不完全确定你的最终目标,但也许下面详述的方法可以让你有所帮助:

    import asyncio
    
    async def listen_for_message():
    
        while True:
    
            await asyncio.sleep(0)
    
            try:
                print('Listening for a message...')
                message = await websocket.recv()
    
                print("< {}".format(message))
    
            except websockets.ConnectionClosed as cc:
                print('Connection closed')
    
            except Exception as e:
                print('Something happened')
    
    
    async def connect_to_dealer():
    
        print('connect to dealer')
        websocket = await websockets.connect('wss://mywebsocket')
    
        hello_message = await websocket.recv()
        print("< {}".format(hello_message))
    
        # We need to parse the connection ID out of the message
        connection_id = hello_message['connectionId']
        print('Got connection id {}'.format(connection_id))
    
        sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
            user_id='username', connection_id=connection_id), headers=headers)
    
        if sub_response.status_code == 200:
            print('Now we\'re observing traffic')
        else:
            print('Oops request failed with {code}'.format(code=sub_response.status_code))
    
    
    async def my_app():
    
        # this will block until connect_to_dealer() returns
        websocket = await connect_to_dealer()
    
        # start listen_for_message() in its own task wrapper, so doing it continues in the background
        asyncio.ensure_future(listen_for_message(websocket))
    
        # you can continue with other code here that can now coexist with listen_for_message()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(my_app())
        loop.run_forever()
    

相关问题