Netty 系列三(ByteBuf).

一、概述和原理

网络数据传输的基本单位总是字节,Netty 提供了 ByteBuf 作为它的字节容器,既解决了 JDK API 的局限性,又为网络应用程序提供了更好的 API,ByteBuf 的优点:

1、可以被用户自定义的缓冲区类型扩展
2、通过内置的复合缓冲区类型实现了透明的零拷贝
3、容量可以按需增长
4、在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法
5、读和写使用了不同的索引
6、支持方法的链式调用
7、支持引用计数
8、支持池化

ByteBuf通过两个索引(readerIndex、writerIndex)划分为三个区域:

图片描述

1、任何名称以 read 或者 skip 开头的操作都将检索或者跳过位于当前readerIndex 的数据,并且将它增加已读字节数;任何名称以 write 开头的操作都将从当前的 writerIndex 处开始写数据,并将它增加已经写入的字节数。readerIndex 不能超过 writerIndex。

2、如果被调用的方法需要一个 ByteBuf 参数作为读取的目标,并且没有指定目标索引参数,那么该目标缓冲区的 writerIndex 也将被增加;如果写操作的目标也是 ByteBuf,并且没有指定源索引的值,则源缓冲区的 readerIndex 也同样会被增加相同的大小。

3、如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据, 那么将会引发一个IndexOutOfBoundsException;如果尝试往目标写入超过目标容量的数据,将检查当前的写索引以及最大容量是否可以在扩展后容纳该数据,可以的话就会分配并调整容量,否则将会引发一个IndexOutOfBoundException。

4、通过调用 discardReadBytes()方法, 可以丢弃已读字节并回收空间。但不建议频繁的调用discardReadBytes(),因为将可能导致内存复制:

图片描述

5、通过调用clear()方法来将readerIndex和writerIndex都设置为0。 注意,调用clear()比调用discardReadBytes()轻量得多, 因为它将只是重置索引而不会复制任何的内存。

图片描述

二、ByteBuf 分配

ByteBuf 是一个抽象类,在程序中直接new ByteBuf() 是不现实的啦!毕竟还要实现一堆的方法,并且缺乏池化的管理。那么,在 Netty 中要怎么得到 ByteBuf 呢?

有两种方法可以得到 ByteBuf 实例,一种是ByteBufAllocator (实现了池化,有效的降低了分配和释放内存的开销),另一种是Unpooled (Netty 提供的工具类来创建未池化的ByteBuf 实例)。
ByteBufAllocator:Netty 提供了两种ByteBufAllocator 的实现,PooledByteBufAllocator(默认使用)和UnpooledByteBufAllocator。前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片;后者的实现不池化ByteBuf实例, 并且在每次它被调用时都会返回一个新的实例。
创建 ButeBuf 实例的代码如下:

public void createBuf() {
    //获得ByteBufAllocator 的两种方式
    //1、
    Channel channel = null;
    ByteBufAllocator alloc = channel.alloc();
    //2、
    ChannelHandlerContext channelHandlerContext = null;
    ByteBufAllocator alloc1 = channelHandlerContext.alloc();
    
    //ByteBufAllocator 创建 ByteBuf 实例
    //1、返回一个基于堆或者直接内存存储的ByteBuf
    ByteBuf byteBuf = alloc.buffer(256, Integer.MAX_VALUE);
    //2、返回一个基于堆内存存储的 ByteBuf 实例
    ByteBuf byteBuf1 = alloc.heapBuffer(256);

    byteBuf1.refCnt();//检查该ByteBuf的引用计数
    byteBuf1.release();//将ByteBuf的引用计数设为0并释放

    //3、返回一个基于直接内存存储的 ByteBuf
    ByteBuf byteBuf2 = alloc.directBuffer();
    //4、返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的 CompositeByteBuf
    CompositeByteBuf compositeByteBuf = alloc.compositeBuffer();
    CompositeByteBuf compositeByteBuf1 = alloc.compositeHeapBuffer(16);
    CompositeByteBuf compositeByteBuf2 = alloc.compositeDirectBuffer(16);
    //5、返回一个用于套接字的 I/O 操作的ByteBuf
    ByteBuf byteBuf3 = alloc.ioBuffer();
    
 
    //Unpooled 创建 ByteBuf 实例
    //1、创建一个未池化的基于堆内存存储的 ByteBuf 实例
    ByteBuf buf = Unpooled.buffer();
    //2、创建一个未池化的基于内存存储的ByteBuf
    ByteBuf buf1 = Unpooled.directBuffer(256, Integer.MAX_VALUE);
    //3、返回一个包装了给定数据的 ByteBuf
    Unpooled.wrappedBuffer("Hello Netty".getBytes());
    //4、返回一个复制了给定数据的 ByteBuf
    Unpooled.copiedBuffer("Hello Netty",CharsetUtil.UTF_8);
}

三、ByteBuf 操作

涉及到 ByteBuf 的操作主要是两个方面,一个是 ByteBuf 的读/写,另一个是 ByteBuf 的复制分片操作。

ByteBuf 的读/写操作操作有两种类别:get() 和 set() 操作,从给定的索引开始,并且保持索引不变,也就是说get() 和 set() 操作并不会改变 readerIndex 和 writerIndex 的值;read()和 write()操作, 从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整,比如 read() 操作 readerIndex 会根据读取的数据类型(byte 1个字节,short 2个字节,int 4个字节,long 8个字节)增加对应的索引数。

ByteBuf 提供了专门的方式来实现复制分片操作:

duplicate();
slice();
slice(int, int);
Unpooled.unmodifiableBuffer(…);
order(ByteOrder);
readSlice(int)。

这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。这意味着,如果你修改了它的内容,也同时修改了其对应ByteBuf的源实例。

如果需要一个现有缓冲区的真实副本,请使用 copy()或者 copy(int, int)方法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本。

现在我们具体来看看 ByteBuf 的操作:

1、数据遍历

for (int i = 0; i < byteBuf.capacity(); i++) {
    byte aByte = byteBuf.getByte(i);
    System.out.print((char) aByte);
}

while (byteBuf.isReadable()){
    System.out.print((char) byteBuf.readByte());
}

2、写入数据

while (byteBuf.writableBytes() >= 4){
    byteBuf.writeByte(65);
}

3、索引标记管理

ByteBuf buf = byteBuf.readerIndex(0);//将 readerIndex 移动到指定的位置
buf.markReaderIndex();//标记当前的 readerIndex
while (buf.isReadable()){
    System.out.print((char) buf.readByte());
}
buf.resetReaderIndex();//回退到之前标记的 readerIndex
while (buf.isReadable()){
    System.out.print((char) buf.readByte());
}

int index = byteBuf.indexOf(0, byteBuf.capacity() - 1, (byte) 65);//在某个范围内查找某个字节的索引

4、分片操作

ByteBuf slice = byteBuf.slice(0, 15);
System.out.print(slice.toString(CharsetUtil.UTF_8));
//更新索引0处的字节
slice.setByte(0, (byte) 'J');
byte aByte = byteBuf.getByte(0);
System.out.print("\r\n" + (char)aByte);

5、复制操作

ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(CharsetUtil.UTF_8));
copy.setByte(0, (byte) 'A');
System.out.println((char) byteBuf.getByte(0));

6、其他操作

System.out.println("如果至少有一个字节可读取:" + byteBuf.isReadable());
System.out.println("如果至少有一个字节可写入:" + byteBuf.isWritable());
System.out.println("返回可被读取的字节数:" + byteBuf.readableBytes());
System.out.println("返回可被写入的字节数:" + byteBuf.writableBytes());
System.out.println("可容纳的字节数:" + byteBuf.capacity() + ",可扩展最大的字节数:" + byteBuf.maxCapacity());
System.out.println("是否由一个字节数组支撑:" + byteBuf.hasArray());
System.out.println("由一个字节数组支撑则返回该数组:" + byteBuf.array().length);
System.out.println("计算第一个字节的偏移量:" + byteBuf.arrayOffset());
System.out.println("返回Bytebuf的十六进制:" + ByteBufUtil.hexDump(byteBuf.array()));

四、补充

ByteBuf 的使用模式 :1、堆缓冲区:最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。 这种模式被称为支撑数组(backing array), 它能在没有使用池化的情况下提供快速的分配和释放。
2、直接缓冲区:将数据驻留在会被垃圾回收的堆之外,直接缓冲区对于网络数据传输是最理想的选择,不过,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。另外,如果你的数据包含在一个在堆上分配的缓冲区中, 那么事实上,在通过套接字发送它之前, JVM将会在内部把你的缓冲区复制到一个直接缓冲区中。经验表明,Bytebuf的最佳实践是在IO通信线程的读写缓冲区使用DirectByteBuf,后端业务使用HeapByteBuf。
3、复合缓冲区:为多个 ByteBuf 提供一个聚合视图。 在这里你可以根据需要添加或者删除 ByteBuf 实例。Netty 通过一个 ByteBuf 子类——CompositeByteBuf——实现了这个模式, 它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。使用 CompositeByteBuf 的复合缓冲区模式:

public void CompositeBuffer() {
    CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
    ByteBuf headBuf = Unpooled.copiedBuffer("Hello,", CharsetUtil.UTF_8);
    ByteBuf bodyBuf = Unpooled.copiedBuffer("Netty!", CharsetUtil.UTF_8);
    //将 ByteBuf 实例追加到 CompositeByteBuf
    messageBuf.addComponents(headBuf, bodyBuf);
    Iterator<ByteBuf> it = messageBuf.iterator();
    //访问CompositeByteBuf数据
    while(it.hasNext()){
        ByteBuf buf = it.next();
        while (buf.isReadable()){
            System.out.print((char) buf.readByte());
        }
    }
    //使用数组访问数据
    if(!messageBuf.hasArray()){
        int len = messageBuf.readableBytes();
        byte[] arr = new byte[len];
        messageBuf.getBytes(0, arr);
        for (byte b : arr){
            System.out.print((char)b);
        }
    }
    messageBuf.removeComponent(0); //删除位于索引位置为 0(第一个组件)的 ByteBuf
}

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo/blob/master/src/main/java/org/netty/demo/bytebuf/ByteBufOperation.java