首页 文章

Asyncio aiohttp - redis Pub / Sub和websocket在单个处理程序中读/写

提问于
浏览
14

我正在玩aiohttp,看看它将如何作为带有websocket连接的移动应用程序的服务器应用程序 .

这里很简单"Hello world"例子(as gist here):

import asyncio
import aiohttp
from aiohttp import web


class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        print('Connection opened')
        try:
            while True:
                msg = yield from ws.receive()
                ws.send_str(msg.data + '/answer')
        except:
            pass
        finally:
            print('Connection closed')
        return ws


if __name__ == "__main__":
    app = aiohttp.web.Application()
    app.router.add_route('GET', '/ws', WebsocketEchoHandler())

    loop = asyncio.get_event_loop()
    handler = app.make_handler()

    f = loop.create_server(
        handler,
        '127.0.0.1',
        8080,
    )

    srv = loop.run_until_complete(f)
    print("Server started at {sock[0]}:{sock[1]}".format(
        sock=srv.sockets[0].getsockname()
    ))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(handler.finish_connections(1.0))
        srv.close()
        loop.run_until_complete(srv.wait_closed())
        loop.run_until_complete(app.finish())
    loop.close()

问题

现在我想使用下面描述的结构(node server = python aiohttp) . 更具体地说,使用带有asyncio-redisRedis Pub/Sub机制来读取和写入我的WebsocketEchoHandler中的websocket连接和Redis .

WebsocketEchoHandler是一个简单的循环,所以我不确定应该怎么做 . 使用Tornadobrükva我只会使用回调 .

http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis

额外(可能是offtopic)问题

由于我已经使用Redis,我应采取以下两种方法中的哪一种:

  • 喜欢"classic"网络应用程序,有一个控制器/视图的一切,使用Redis仅用于消息传递等 .

  • Web应用程序应该只是客户端和_2440708之间的一个层 - 也用作任务队列(最简单的Python RQ) . 每个请求都应该委托给 Worker .

编辑

来自http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis的图片

编辑2

我似乎需要澄清一下 .

  • Websocket-only handler is shown above

  • Redis Pub/Sub handler might look like that:

class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
        subscriber = yield from connection.start_subscribe()
        yield from subscriber.subscribe(['ch1', 'ch2'])

        print('Connection opened')
        try:
            while True:
                msg = yield from subscriber.next_published()
                ws.send_str(msg.value + '/answer')
        except:
            pass
        finally:
            print('Connection closed')
        return ws

该处理程序只订阅Redis通道ch1和ch2,并将从这些通道收到的每条消息发送到websocket .

  • I want to have this handler:
class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
        subscriber = yield from connection.start_subscribe()
        yield from subscriber.subscribe(['ch1', 'ch2'])

        print('Connection opened')
        try:
            while True:
                # If message recived from redis OR from websocket
                msg_ws = yield from ws.receive()
                msg_redis = yield from subscriber.next_published()
                if msg_ws:
                    # push to redis / do something else
                    self.on_msg_from_ws(msg_ws)
                if msg_redis:
                    self.on_msg_from_redis(msg_redis)
        except:
            pass
        finally:
            print('Connection closed')
        return ws

但是下面的代码总是按顺序调用,所以读取来自Redis的websocket块:

msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()

我希望在事件是从两个来源之一收到的消息的事件上完成阅读 .

2 回答

  • 21

    您应该使用两个 while 循环 - 一个用于处理来自websocket的消息,另一个用于处理来自redis的消息 . 你的主处理程序可以启动两个协同程序,一个处理每个循环,然后等待它们:

    class WebsocketEchoHandler:
        @asyncio.coroutine
        def __call__(self, request):
            ws = web.WebSocketResponse()
            ws.start(request)
    
            connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
            subscriber = yield from connection.start_subscribe()
            yield from subscriber.subscribe(['ch1', 'ch2'])
    
            print('Connection opened')
            try:
                # Kick off both coroutines in parallel, and then block
                # until both are completed.
                yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
            except Exception as e:  # Don't do except: pass
                import traceback
                traceback.print_exc()
            finally:
                print('Connection closed')
            return ws
    
        @asyncio.coroutine
        def handle_ws(self, ws):
            while True:
                msg_ws = yield from ws.receive()
                if msg_ws:
                    self.on_msg_from_ws(msg_ws)
    
        @asyncio.coroutine
        def handle_redis(self, subscriber):
            while True:
                msg_redis = yield from subscriber.next_published()
                if msg_redis:
                    self.on_msg_from_redis(msg_redis)
    

    通过这种方式,您可以从两个潜在来源中的任何一个进行阅读,而无需关心另一个 .

  • 1

    最近我们可以在python 3.5及更高版本中使用async await ..

    async def task1(ws):
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                data = msg.data
                print(data)
                if data:
                    await ws.send_str('pong')
    ## ch is a redis channel
    async def task2(ch):
        async for msg in ch1.iter(encoding="utf-8", decoder=json.loads):
            print("receving", msg)
            user_token = msg['token']
            if user_token in r_cons.keys():
                _ws = r_cons[user_token]
                await  _ws.send_json(msg)
    
    coroutines = list()
    coroutines.append(task1(ws))
    coroutines.append(task2(ch1))
    
    await asyncio.gather(*coroutines)
    

    这就是我所做的 . 当websockets需要等待来自mutli源的消息时 .

    这里的要点是使用asyncio.gather像@dano一样运行两个corotine .

相关问题