首页 文章

asyncio服务器和客户端来处理来自控制台的输入

提问于
浏览
0

我有一个asyncio TCP服务器从客户端接收消息,在服务器上执行stuff()并发回文本 . 服务器在正确接收和发送数据的意义上运行良好 . 问题是我无法从客户端的服务器取回消息,因为我在控制台输入时有阻塞例程(基本上从不执行data_received方法) . 只有exit命令工作正常(它关闭循环) . 怎么解决这个?这是服务器和客户端代码 . 它基本上是EchoClient asyncio版本,还有一些练习的管道代码 .

# client.py
import abc
import asyncio
import sys

MENU = '''
a) do x
b) do y
c) exit
'''

loop_ = asyncio.get_event_loop()


class XCommand:
    def run(self):
        self.client.send_data_to_tcp('X:')  # to bytes


class YCommand(Command):
    def run(self):
         s = input('Input for Y ###  ')
         self.client.send_data_to_tcp('Y:' + s)


class ExitCommand(Command):
    def run(self):
        self.client.send_data_to_tcp('EXIT:')
        print('Goodbye!')
        loop_.close()
        exit()


class CommandFactory:
    _cmds = {'a': ACommand,
         'b': BCommand,
         'c': ExitCommand,
         }

    @classmethod
    def get_cmd(cls, cmd):
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls


def show_menu(client):
    print(MENU)
    while True:
        command = input('Insert Command$: ')
        cmd_cls = CommandFactory.get_cmd(command)
        if not cmd_cls:
            print('Unknown: {}'.format(command))
            continue
        cmd_cls(client).run()


class Client(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print('Data received from server: \n{!r}'.format(data.decode()), flush=True)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()


def main():

    client = Client(loop_)
    coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
    loop_.run_until_complete(coro)
    loop_.run_in_executor(None, show_menu(client))  # I've tried this...not working

    loop_.run_forever()
    loop_.close()

if __name__ == '__main__':
    main()


# server.py
import abc
import asyncio
import sys
from asyncio_exercise.db import DB


class ACommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        res = db.a()
        if not res:
            return '>>>>>>>>>>> Empty <<<<<<<<<<<<<'
        return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())


class BCommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        db.b(param1, param2)
        return 'B Ok!'


class ExitCommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        loop.close()
        server.close()
        loop.run_until_complete(server.wait_closed())
        print('Buona giornata!!!')
        sys.exit(0)

class CommandFactory:
    _cmds = {'X': ACommand,
         'Y': BCommand,
         'EXIT': ExitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        tokens = cmd.split(':')
        cmd = tokens[0]
        if len(tokens) == 1:
            param1, param2 = None, None
        else:
            param1, param2 = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls, param1, param2


class Server(asyncio.Protocol):
    db_filename = '../data/db'
    db = DB(db_filename)

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))
        cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
        res = cmd_cls.run(self.db, param1, param2)
        print('Sending response: {!r}'.format(res))
        self.transport.write(bytes(res, encoding='UTF-8'))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(Server, '127.0.0.1', 10888)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        # Close the server
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()

UPDATE :解决方案是使用aioconsole包和输入函数 . 使用aioconsole的代码(工作非常好) .

# server.py
import abc
import asyncio
from d_1_networking.esercizio_soluzione.SOversion.dummydb import DummyDB as DB


class Command(metaclass=abc.ABCMeta):
    @abc.abstractclassmethod
    def run(self, a, b, c):
        raise NotImplementedError()


class XCommand(Command):
    @classmethod
    def run(cls, db, param1=None, param2=None):
        res = db.x()
        if not res:
            return '>>>>>>>>>>> Empty response! <<<<<<<<<<<<<'
        return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())


class YCommand(Command):
    @classmethod
    def run(cls, db, param1=None, param2=None):
        db.y(param1)
        return 'Operation Y OK: {}'.format(param1)


class QuitCommand(Command):
    @classmethod
    def run(cls, rubrica_db, param1=None, param2=None):
        return 'Disconnected...'

class CommandFactory:
    _cmds = {'X': XCommand,
         'Y': YCommand,
         'DISCONNECT': QuitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        tokens = cmd.split(':')
        cmd = tokens[0]
        if len(tokens) == 1:
            nome, numero = None, None
        else:
            nome, numero = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls, nome, numero

class Server(asyncio.Protocol):
    db_filename = '../data/exercise.db'
    db = DB(db_filename)

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))
        cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
        res = cmd_cls.run(self.db, param1, param2)
        print('Sending response: {!r}'.format(res))
        self.transport.write(bytes(res, encoding='UTF-8'))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(RubricaServer, '127.0.0.1', 10888)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

#dummydb.py
class DummyDB:
    def __init__(self, fn):
        self.fn = fn

    def x(self):
        return {'field_a': '55 tt TTYY 3334 gghyyujh',
            'field_b': 'FF hhhnneeekk',
            'field_c': '00993342489048222 news'}

    def y(self, param):
        return param

# client.py
import abc
from asyncio import *
from aioconsole import ainput

MENU = '''
---------------------------
A) Command X
B) Command Y (require additional input)
C) Quit program
---------------------------
'''

loop_ = get_event_loop()


class Command(metaclass=abc.ABCMeta):
    asyn = False

    def __init__(self, tcp_client):
        self.client = tcp_client

    @abc.abstractmethod
    def run(self):
        raise NotImplementedError()


class ACommand(Command):
    def run(self):
        # send X command to server
        self.client.send_data_to_tcp('X:')


class BCommand(Command):
    asyn = True
    async def run(self):
        s = await ainput('Insert data for B operation (es. name:43d3HHte3) > ')
        # send Y command to server
        self.client.send_data_to_tcp('Y:' + s)


class QuitCommand(Command):
    def run(self):
        self.client.send_data_to_tcp('DISCONNECT:')
        print('Goodbye!!!')
        self.client.disconnect()
        exit()


class CommandFactory:
    _cmds = {'A': ACommand,
         'B': BCommand,
         'C': QuitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        cmd = cmd.strip()
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls


class Client(Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def disconnect(self):
        self.loop.stop()

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print('Data received from server: \n===========\n{}\n===========\n'.format(data.decode()), flush=True)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()


def menu():
    print(MENU)


async def main():
    menu()
    while True:
        cmd = await ainput('Insert Command >')
        cmd_cls = CommandFactory.get_cmd(cmd)
        if not cmd_cls:
            print('Unknown: {}'.format(cmd))
        elif cmd_cls.asyn:
            await cmd_cls(client).run()
        else:
            cmd_cls(client).run()


if __name__ == '__main__':
    client = Client(loop_)
    coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
    loop_.run_until_complete(coro)
    loop_.run_until_complete(main())

1 回答

  • 6

    您可以考虑使用aioconsole.ainput

    from aioconsole import ainput
    
    async def some_coroutine():
        line = await ainput(">>> ")
        [...]
    

    该项目可在PyPI获取 .

相关问题