首页 文章

如何在asyncio上重新连接套接字?

提问于
浏览
11

我想在app.py上创建两个带有asyncio的协议(TcpClient和UdpServer),其中TcpClient将与server.py和UdpServer作为UDP服务器 Build 持久连接:

我需要的:
a)两种通信协议:相互调用方法 . 这只是在第一个连接上工作 . 如果TcpClient重新连接,则无法再次发送来自UdpServer的字符串"send to tcp." . 我检查 print(self) 并且TcpClient创建一个新实例,旧的仍然存在,但没有连接,但我不知道如何重构 . 我认为我以错误的方式使用asyncio .
b)当TcpClient与server.py断开连接时,等待5s并再次尝试重新连接,依此类推 . 我尝试使用asyncio的 call_later() ,但我认为有一种本地方式来做到这一点,而不是一个技巧 .
c)当我启动app.py时,如果TcpClient可以't to connect I would like to try to reconnect again after 5s, and so on. I dont'知道如何做到这一点 .

这是我的app.py server.py的示例测试 . server.py仅用于测试 - 这将是另一种语言 .

只是说我尝试了什么:
1)当我启动app.py并且server.py关闭时,app.py不会重试 .
2)当app.py连接到server.py并且服务器关闭并快速启动时,TcpClient重新连接,但我可以't more to connect each other methods on the new instance and send string 2994114 to the server.py, just the old, where don't有更多连接 .
3)如果我使用 asyncio.async() 而不是 run_until_complete() 我无法从其他协议调用方法 .

我把app.py和server.py放在这里,所以你可以复制并运行测试 .

我使用 ncat localhost 9000 -u -v 发送字符串"send to tcp." . 此字符串需要打印在UdpServer类上并传递给TcpClient类上的方法send_data_to_tcp,此方法将字符串发送到server.py . < - 首次重新连接tcpClient后,这不起作用 .

我正在使用Python 3.4.0 .

非常感谢 .

app.py:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        #client_tcp = loop.run_until_complete(coro)
        client_tcp = asyncio.async(coro)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
#server_udp = asyncio.Task(coro)
server_udp = loop.run_until_complete(coro)


#TCP client
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = asyncio.async(coro)
client_tcp = loop.run_until_complete(coro)

loop.run_forever()

server.py:

import asyncio

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))
        self.transport.write(data)

        # close the socket
        #self.transport.close()

    #def connection_lost(self):
    #    print('server closed the connection')



loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, 'localhost', 8000)
server = loop.run_until_complete(coro)
print(server)
print(dir(server))
print(dir(server.sockets))

print('serving on {}'.format(server.sockets[0].getsockname()))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print("exit")
finally:
    server.close()
    loop.close()

1 回答

  • 13

    你真的只需要一些小修正 . 首先,我写了一个协程来处理连接重试:

    @asyncio.coroutine
    def do_connect():
        global tcp_server  # Make sure we use the global tcp_server
        while True:
            try:
                tcp_server = yield from loop.create_connection(TcpClient, 
                                                               'localhost', 8000)
            except OSError:
                print("Server not up retrying in 5 seconds...")
                yield from asyncio.sleep(5)
            else:
                break
    

    然后我们用它来启动所有事情:

    loop = asyncio.get_event_loop()
    
    #UDP Server
    coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
    server_udp = loop.run_until_complete(coro)
    
    #TCP client
    loop.run_until_complete(do_connect())
    
    loop.run_forever()
    

    接下来的部分是在app.py启动后处理服务器关闭/返回 . 我们需要修复 tcp_client_disconnectedconnect_client_tcp 才能正确处理:

    def connect_client_tcp(self):
        global client_tcp
        task = asyncio.async(do_connect())
        def cb(result):
            client_tcp = result
        task.add_done_callback(cb)
    
    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)
    

    有趣的是 connect_client_tcp . 您的原始版本有两个问题:

    • 您将 client_tcp 直接分配给 asyncio.async(coro) 的结果,这意味着 client_tcp 已分配给 asyncio.Task . 那不是你想要的;您希望将 client_tcp 分配给已完成的 asyncio.Task 的结果 . 我们通过使用 task.add_done_callbackclient_tcp 分配给 Task 的结果完成后实现了这一点 .

    • 您忘记了方法顶部的 global client_tcp . 没有它,你只是创建了一个名为 client_tcp 的局部变量,它在 connect_client_tcp 的末尾被抛弃了 .

    一旦修复了这些问题,我就可以随时运行 app.py ,启动/停止 serv.py ,但是当所有三个组件一起运行时,始终会看到所有消息从 ncat 正确传递到 serv.py .

    这是完整的 app.py ,便于复制/粘贴:

    import asyncio
    
    #TCP client
    class TcpClient(asyncio.Protocol):
        message = 'Testing'
    
        def connection_made(self, transport):
            self.transport = transport
            self.transport.write(self.message.encode())
            print('data sent: {}'.format(self.message))
            server_udp[1].tcp_client_connected()
    
    
        def data_received(self, data):
            self.data = format(data.decode())
            print('data received: {}'.format(data.decode()))
            if self.data == 'Testing':
                server_udp[1].send_data_to_udp(self.data)
    
        def send_data_to_tcp(self, data):
            self.transport.write(data.encode())
    
        def connection_lost(self, exc):
            msg = 'Connection lost with the server...'
            info = self.transport.get_extra_info('peername')
            server_udp[1].tcp_client_disconnected(msg, info)
    
    
    #UDP Server
    class UdpServer(asyncio.DatagramProtocol):
    
        CLIENT_TCP_TIMEOUT = 5.0
    
        def __init__(self):
            self.client_tcp_timeout = None
    
        def connection_made(self, transport):
            print('start', transport)
            self.transport = transport
    
        def datagram_received(self, data, addr):
            self.data = data.strip()
            self.data = self.data.decode()
            print('Data received:', self.data, addr)
            if self.data == 'send to tcp.':
                client_tcp[1].send_data_to_tcp(self.data)
    
        def connection_lost(self, exc):
            print('stop', exc)
    
        def send_data_to_udp(self, data):
            print('Receiving on UDPServer Class: ', (data))
    
        def connect_client_tcp(self):
            global client_tcp
            coro = loop.create_connection(TcpClient, 'localhost', 8000)
            task = asyncio.async(do_connect())
            def cb(result):
                client_tcp = result
            task.add_done_callback(cb)
    
        def tcp_client_disconnected(self, data, info):
            print(data)
            self.client_tcp_info = info
            self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)
    
        def tcp_client_connected(self):
            if self.client_tcp_timeout:
                self.client_tcp_timeout.cancel()
                print('call_later cancel.')
    
    @asyncio.coroutine
    def do_connect():
        global client_tcp
        while True:
            try:
                client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000)
            except OSError:
                print("Server not up retrying in 5 seconds...")
                yield from asyncio.sleep(1)
            else:
                break
    
    loop = asyncio.get_event_loop()
    
    #UDP Server
    coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
    server_udp = loop.run_until_complete(coro)
    
    #TCP client
    loop.run_until_complete(do_connect())
    
    loop.run_forever()
    

相关问题