首页 文章

结合2个基于asyncio的代码段

提问于
浏览
1

我正在使用Autobahn asyncio系统(谈论Websocket WAMP协议),它工作正常,我可以处理传入的RPC调用和pubsub . 我的问题是,我现在必须连接TCP套接字,并在通过高速公路部分进入RPC调用时立即通过这些套接字发送信息 .

高速公路部分工作正常如下:

from autobahn.asyncio.component import Component, run
from asyncio import sleep
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

@comp.on_join
async def joined(session, details):
    print("Connected to websocket")

    def on_message(msg):
        msg = json.loads(msg)
        print(msg)

    def some_rpc(with_data):
        print("Doing something with the data")
        return json.dumps({'status': 'OK'})

    try:
        session.subscribe(on_message, u'some_pubsub_topic')
        session.register(some_rpc, u'some_rpc_call')
        print("RPC and Pubsub initialized")

    except Exception as e:
        print("could not subscribe to topic: {0}".format(e))

if __name__ == "__main__":
     run([comp])

但是现在我需要能够连接到多个常规TCP套接字:

class SocketClient(asyncio.Protocol):
    def __init__(self, loop):
        self.data = b''
        self.loop = loop

    def connection_made(self, transport):
        self.transport = transport
        print('connected')

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def send(self, data):
        self.transport.write(data)

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()

c=loop.create_connection(lambda: SocketClient(loop),
                              '192.168.0.219', 6773)
loop.run_until_complete(c)
loop.run_forever()
loop.close()

问题在于,当我将两者结合起来并执行此操作时:

def some_rpc(with_data):
    c.send('test')
    return json.dumps({'status': 'OK'})

它咆哮着我告诉我:

StopIteration在处理上述异常期间,发生了另一个异常:Traceback(最近一次调用last):onMessage中的文件“/usr/lib/python3.5/site-packages/autobahn/wamp/websocket.py”,第95行self._session.onMessage(msg)文件“/usr/lib/python3.5/site-packages/autobahn/wamp/protocol.py”,第894行,在onMessage on_reply = txaio.as_future(endpoint.fn,* invoke_args, ** invoke_kwargs)文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第400行,在as_future中返回create_future_error(create_failure())文件“/usr/lib/python3.5/site -packages / txaio / aio.py“,第393行,在create_future_error中拒绝(f,错误)文件”/usr/lib/python3.5/site-packages/txaio/aio.py“,第462行,拒绝未来 . set_exception(error.value)文件“/usr/lib64/python3.5/asyncio/futures.py”,第365行,在set_exception中引发TypeError(“StopIteration与生成器交互严重”TypeError:StopIteration与生成器交互不良,无法引发走向未来

有没有人知道如何从RPC调用函数中调用send函数?

1 回答

  • 0

    在这段代码中:

    c=loop.create_connection(lambda: SocketClient(loop),
                                  '192.168.0.219', 6773)
    # [...]
    def some_rpc(with_data):
        c.send('test')
        return json.dumps({'status': 'OK'})
    

    create_connection is a coroutine function,所以 c 包含一个协程对象 . 这样的对象确实有 send 方法,但与通过网络发送内容完全无关 . 在调用 create_connection 之后,您可能希望通过以下方式获得生成的传输:

    transport, ignore = loop.run_until_complete(c)
    

    然后使用transport.write(),而不是 c.send() .

相关问题