一、前言

上一篇文章我们提到 Netty 的核心组件是 Channel、回调、Future、ChannelHandler、EventLoop,这篇文章主要是对 Channel (Netty传入和传出数据的载体)做一些详细的讲解,以及介绍下 Netty 内置的传输类型。

二、传输的核心

传输 API 的核心是 interface Channel ,她被用于所有的 I/O 操作。Channel 类的层次结构如图所示:

图片描述

如图,每个Channel 都会被分配一个 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。ChannelPipeline 是 ChannelHandler链的容器,持有所有入站和出站数据以及ChannelHandler 实例。

由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为java.lang.Compareable的一个子接口,因此每个Channel都有不同的散列码,否则会报Error。

Netty的 Channel 实现是线程安全的,因此你可以存储一个Channel的引用,并且每当你需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它。

Channel 的其他方法如下:

图片描述

tips:

1、ChannelHandler 的典型用途:

-- 将数据从一种格式转换为另一种格式。
-- 提供异常的通知。
-- 提供Channel 变为活动的或者非活动的通知。
-- 提供当Channel 注册到 EventLoop 或者 从 EventLoop 注销时的通知。
-- 提供有关用户自定义事件的通知。

2、Netty 所提供的广泛功能只依赖于少量的接口。这意味着,你可以对你的应用程序逻辑进行重大的修改,而无需大规模的重构你的代码库。

三、Netty 内置的传输类型

Netty 内置了一些可开箱即用的传输。因为并不是它们所有的传输都支持每一种协议,所以你必须选择一个和你的应用程序所使用的协议都相容的传输。

名称描述应用场景
NIOio.netty.channel.socket.nio使用java.nio.channels 包作为基础——基于选择器的方式非阻塞I/O使用
Epollio.netty.channel.epoll由 JNI 驱动的 epoll()和非阻塞 IO。 这个传输支持只有在 Linux上可用的多种特性,如SO_REUSEPORT,比 NIO 传输更快, 而且是完全非阻塞的Linux上的非阻塞 I/O 使用
OIOio.netty.channel.socket.oio使用 java.net 包作为基础——使用阻塞流阻塞 I/O 使用
Localio.netty.channel.local可以在 VM 内部通过管道进行通信的本地传输客户端和服务端都使用同个JVM通信
Embeddedio.netty.channel.embeddedEmbedded 传输,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。这在测试你的ChannelHandler 实现时非常有用测试 ChannelHandler 的实现

1、NIO — 非阻塞I/O

Java NIO 提供了一个所有I/O操作的全异步实现。其中,选择器的背后实际上是充当了一个注册表,如图展示了该处理流程:

图片描述

对于所有Netty的传输实现都共有的用户级别API完全隐藏了Java NIO的实现细节,如上一篇展示的Demo一样,Netty 这样使用Java NIO:

图片描述

2、Epoll—用于 Linux 的本地非阻塞传输

Netty为Linux提供了一组NIO API, 其以一种和它本身的设计更加一致的方式使用epoll,并且以一种更加轻量的方式使用中断。 如果你的应用程序旨在运行于Linux系统, 那么请考虑利用这个版本的传输;你将发现在高负载下它的性能要优于JDK的NIO实现。

Netty 在代码中支持 Epoll 也非常简单,只需做如下的转改变:

图片描述

3、OIO—旧的阻塞 I/O
Netty是如何能够使用和用于异步传输相同的API来支持OIO的呢?上文提到,在NIO中,一个 EventLoop 对应一个线程,一个Channel 绑定一个 EventLoop,而一个EventLoop 可以绑定多个Channel 来实现异步,也就是说一个线程可以处理多个 Channel。而OIO中,一个 EventLoop 仅绑定一个 Channel,也就是说每个线程只处理一个Channel ,这就有点像传统IO中,在服务端(ServerSocket)写了一个多线程来处理客户端的并发请求。现在还有一个问题,channel是双向的,既可以读,也可以写。而stream是单向的,OIO中利用 InputStream 来读,OutputStream 来写。那么Channel 是如何实现阻塞的读和写的呢?答案就是, Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数,I/O 操作期间Channel是阻塞的,如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。 Netty将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式。Netty 在代码中支持 OIO,也和NIO类似:
图片描述

tips:

我从硬盘读取数据,然后程序一直等,数据读完后,继续操作。这种方式是最简单的,叫 阻塞IO。
我从硬盘读取数据,然后程序继续向下执行,等数据读取完后,通知当前程序(对硬件来说叫中断,对程序来说叫回调),然后此程序可以立即处理数据,也可以执行完当前操作在读取数据。叫 非阻塞IO。
4、Local —— 用于 JVM 内部通信的 Local 传输Netty 提供了一个Local传输, 用于在同一个 JVM 中运行的客户端和服务器程序之间的异步通信。在这个传输中,和服务器 Channel 相关联的 SocketAddress 并没有绑定物理网络地址;相反,只要服务器还在运行, 它就会被存储在注册表里,并在 Channel 关闭时注销。 因为这个传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,客户端希望连接到(在同一个 JVM 中)使用了这个传输的服务器端时也必须使用它。服务端代码:
图片描述

图片描述

public void server() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new DefaultEventLoop();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    .channel(LocalServerChannel.class)
                    .childHandler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(new LocalAddress("foo")).sync();
            System.out.println(EchoServer.class.getName() + "--started and listening for connections on--" + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully().sync();
        }
    }

View Code客户端代码:
图片描述

图片描述

public void client() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(LocalChannel.class)
                    .handler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(new LocalAddress("foo")).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

View Code
备注:现在客户端和服务端的连接一直报一个异常,查了很多资料,也看了 Github 上的诸多demo,仍然没有解决。有没有大神帮我解答下?

5、Embedded
Netty 提供了一种额外的传输, 使得你可以将一组 ChannelHandler 作为帮助器类嵌入到其他的 ChannelHandler 内部。 通过这种方式,你将可以扩展一个 ChannelHandler 的功能,而又不需要修改其内部代码。
Embedded 传输的关键是一个被称为 EmbeddedChannel 的具体的Channel实现。如果你想要为自己的 ChannelHandler 实现编写单元测试, 那么请考虑使用 Embedded 传输。

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo