这节讲解基于 Netty 快速实现一个聊天小程序。

一、服务端

1. SimpleChatServerHandler(处理器类)

  该类主要实现了接收来自客户端的消息并转发给其他客户端。

 /**
  * 服务端处理器
  */
 public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> {
     public static ChannelGroup channels 
         = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     
     /**
      * 收到新的客户端连接时调用
      * 将客户端channel存入列表,并广播消息
      */
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
         Channel incoming = ctx.channel();
         // 广播加入消息
         channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
         channels.add(incoming);        // 存入列表    
     }
     
     /**
      * 客户端连接断开时调用
      * 广播消息
      */
     @Override
     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
         Channel incoming = ctx.channel();
         // 广播离开消息
         channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
         // channel会自动从ChannelGroup中删除 
     }
     
     /**
      * 收到消息时调用
      * 将消息转发给其他客户端
      */
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
         Channel incoming = ctx.channel();
         for(Channel channel : channels) {        // 遍历所有连接的客户端
             if(channel != incoming) {            // 其他客户端
                 channel.writeAndFlush("[" + incoming.remoteAddress() + "] " + msg + "\n" );
             } else {                            // 自己
                 channel.writeAndFlush("[you] " + msg + "\n" );
             }
         }
     }
     
     /**
      * 监听到客户端活动时调用
      */
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         Channel incoming = ctx.channel();
         System.out.println("SimpleChatClient: " + incoming.remoteAddress() + " 在线");
     }
     
     /**
      * 监听到客户端不活动时调用
      */
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         Channel incoming = ctx.channel();
         System.out.println("SimpleChatClient: " + incoming.remoteAddress() + " 掉线");
     }
     
     /**
      * 当Netty由于IO错误或者处理器在处理事件抛出异常时调用
      * 关闭连接
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         Channel incoming = ctx.channel();
         System.out.println("SimpleChatClient: " + incoming.remoteAddress() + " 异常");
     }
 }

2. SimpleChatServerInitializer(配置 Channel 类)

  该类添加分隔符协议处理类,解码、编码器还有自定义处理器。

 /**
  * 服务器配置初始化
  * 添加多个处理器
  */
 public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         // 添加处理类
         // 使用'\r''\n'分割帧
         pipeline.addLast("framer", 
                 new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
         // 解码、编码器
         pipeline.addLast("decoder", new StringDecoder());
         pipeline.addLast("encoder", new StringEncoder());
         // 处理器
         pipeline.addLast("handler", new SimpleChatServerHandler());
         
         System.out.println("SimpleChatClient: " + ch.remoteAddress() + "连接上");
     }
 
 }

3. SimpleChatServer(服务端主程序)

  启动服务端。

 /**
  * 服务端 main 启动
  */
 public class SimpleChatServer {
     private int port;        // 端口
     
     public SimpleChatServer(int port) {
         this.port = port;
     }
     
     // 配置并开启服务器
     public void run() throws Exception {
         EventLoopGroup bossGroup = new NioEventLoopGroup();        // 用来接收进来的连接
         EventLoopGroup workerGroup = new NioEventLoopGroup();    // 用来处理已接收的连接
         
         try {
             ServerBootstrap sb = new ServerBootstrap();            // 启动NIO服务的辅助启动类
             sb.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)                // 设置如何接受连接
                 .childHandler(new SimpleChatServerInitializer())    // 配置Channel
                 .option(ChannelOption.SO_BACKLOG, 128)                // 设置缓冲区
                 .childOption(ChannelOption.SO_KEEPALIVE, true);    // 启用心跳机制
             
             System.out.println("SimpleChatServer 启动了");
             ChannelFuture future = sb.bind(port).sync();        // 绑定端口,开始接收连接
             future.channel().closeFuture().sync();                // 等待关闭服务器(不会发生)
         } finally {
             workerGroup.shutdownGracefully();
             bossGroup.shutdownGracefully();
             System.out.println("SimpleChatServer 关闭了");
         }
     }
     
     public static void main(String[] args) throws Exception {
         int port = 8080;
         new SimpleChatServer(port).run();     // 开启服务器
     }
 }

二、客户端

1. SimpleChatClientHandler(处理器类)

  直接输出收到的消息。

 /**
  * 客户端处理类
  * 直接输出收到的消息
  */
 public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
         System.out.println(msg);    // 直接输出消息        
     }
 
 }

2. SimpleChatClientInitializer(配置 Channel 类)

  与服务端类似。

 /**
  * 客户端配置初始化
  * 与服务端类似
  */
 public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         // 添加处理类
         // 使用'\r''\n'分割帧
         pipeline.addLast("framer", 
                 new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
         // 解码、编码器
         pipeline.addLast("decoder", new StringDecoder());
         pipeline.addLast("encoder", new StringEncoder());
         // 处理器
         pipeline.addLast("handler", new SimpleChatClientHandler());
     }
 
 }

3. SimpleChatClient(客户端主程序)

  接收来自控制台的消息,每帧以 "\r\n" 结尾,再发给服务端。

 /**
  * 客户端
  * 开启客户端,接收控制台输入并发送给服务端
  */
 public class SimpleChatClient {
     private final String host;        // IP
     private final int port;        // 端口
     
     public SimpleChatClient(String host, int port) {
         this.host = host;
         this.port = port;
     }
     
     // 配置并运行客户端
     public void run() throws Exception {
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap b = new Bootstrap();        // 客户端辅助启动类
             b.group(group)                                    // 客户端只需要一个用来接收并处理连接
                 .channel(NioSocketChannel.class)            // 设置如何接受连接
                 .handler(new SimpleChatClientInitializer());// 配置 channel
             // 连接服务器
             Channel channel = b.connect(host, port).sync().channel();
             // 读取控制台输入字符
             BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
             while(true) {
                 // 每行成一帧输出,以"\r\n"结尾
                 channel.writeAndFlush(in.readLine() + "\r\n");
             }
         } catch (Exception e) {
             e.printStackTrace();        // 输出异常
         } finally {
             group.shutdownGracefully();    // 关闭
         }
     }
     
     public static void main(String[] args) throws Exception {
         new SimpleChatClient("localhost", 8080).run();        // 启动客户端
     }
 
 }

三、运行效果

  先运行服务端程序,然后在运行两次客户端程序,如下:
  服务端输出:  
图片描述
  首先连接的客户端输出:  
图片描述

  随便选个客户端在控制台输出信息并回车,如下:
   自身输出:

  
图片描述

  另一客户端输出:  
图片描述

  以上……