功能介绍 Java 本身提供了 ByteBuffer 类,为什么 Netty 还要搞一个 ByteBuf 类呢?因为 ByteBuffer 类有着许多缺点: - ByteBuffer 长度固定,无法动态伸缩。
- ByteBuffer 只有一个位置指针 position,读写的时候需要手工调用 flip 和 rewind 方法进行模式转换,操作繁琐,容易出错。
- 功能太少,缺少一些高级特性。
为了弥补这些不足,Netty 提供了自己的缓冲区类实现 ByteBuf。有什么特点呢? - 两个位置指针协助缓冲区的读写操作:readerIndex、writerIndex,读写之间不需要调整指针位置,大大简化了读写操作。
- 可以动态扩容。
- 当有部分内容已经读取完成时,可以通过 discard 操作对缓冲区进行整理,在不重新申请内存的情况下,增大可写字节数目。
- 支持标记和回滚的功能。
- 支持在 ByteBuf 中查找某个字符串。
- 派生出另一个 ByteBuf:duplicate、copy、slice。
- 转化成标准的 ByteBuffer,这是因为在使用 NIO 进行网络读写时,操作的对象还是 JDK 标准的 ByteBuffer。
- 随机读写。
源码分析继承关系ByteBuf 的主要功能类继承关系如下图所示: 从内存分配的角度看,ByteBuf 可以分为两类:
1. 堆内存字节缓冲区 HeapByteBuf:优点是内存分配和回收速度快,可以被 JVM 自动回收。缺点是,如果进行 Socket 的 I/O 读写,需要额外做一次内存复制,在堆内存缓冲区和内核 Channel 之间进行复制,性能会有一定程度下降。
2. 直接内存字节缓冲区 DirectByteBuf:非堆内存,直接在堆外进行分配。相比于堆内存,内存分配和回收稍慢,但是可以减少复制,提升性能。 两种内存,各有利弊。Netty 最佳实践表明:在 I/O 通信线程的读写缓冲区使用 DirectByteBuf,后端业务消息的编解码模块使用 HeapByteBuf,这样组合可以达到性能最优。 从内存回收的角度看,ByteBuf 也分为两类:基于对象池的 ByteBuf 和普通 ByteBuf。两者区别在于基于对象池的 ByteBuf 可以重用 ByteBuf 对象,它自己维护了一个内存池,可以循环利用创建的 ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁 GC。内存池的缺点是管理和维护比较复杂,使用时需要更加谨慎。 下面我们对一些关键类进行分析和解读。 AbstractByteBufAbstractByteBuf 都做了哪些事儿呢?我们先看一下其主要的成员变量:
1. 读写指针。
2. 用于标记回滚的 marked 读写指针。
3. 最大容量 maxCapacity,用于进行内存保护。
4. 与本 ByteBuf 大小端属性相反的 ByteBuf:SwappedByteBuf。 我们发现这里没有真正存储数据的数据结构,例如 byte 数组或 DirectByteBuffer,原因是这里还不知道子类是要基于堆内存还是直接内存。
public abstract class AbstractByteBuf extends ByteBuf { static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
int readerIndex; int writerIndex; private int markedReaderIndex; private int markedWriterIndex;
private int maxCapacity;
private SwappedByteBuf swappedBuf; }
接下来我们看看读操作 readBytes 方法,AbstractByteBuf 类做了什么呢? 1. 在 checkReadableBytes 方法中,检查入参有效性。 2. 修改读指针 readerIndex。
@Override public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); // getBytes 方法未在 AbstractByteBuf 中实现 getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; }
protected final void checkReadableBytes(int minimumReadableBytes) { ensureAccessible(); if (minimumReadableBytes < 0) { throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)"); } if (readerIndex > writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", readerIndex, minimumReadableBytes, writerIndex, this)); } }
从当前 ByteBuf 中复制数据到 dst 是在 getBytes 方法中,该方法未在 AbstractByteBuf 中实现,也是因为此时具体如何存储数据尚不确定。
下面我们看一下写操作 writeBytes 方法,AbstractByteBuf 负责实现了哪些操作呢? 1. 有效性检查,如果引用计数 refCnt 为 0,表示该 ByteBuf 已经被回收,不能再写入。 2. 输入参数有效性检查:要写入的数据量不能小于 0,写入之后总数据量也不能大于最大容量。 3. 当容量不足时,如果尚未超过最大容量,则进行扩容。 4. 修改写指针。
@Override public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { ensureAccessible(); ensureWritable(length); // setBytes 交给子类实现。 setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; }
protected final void ensureAccessible() { if (refCnt() == 0) { throw new IllegalReferenceCountException(0); } }
@Override public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); }
if (minWritableBytes <= writableBytes()) { return this; }
if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); }
// Normalize the current capacity to the power of 2. int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// 具体扩容操作由子类实现。 capacity(newCapacity); return this; }
在读写操作中,AbstractByteBuf 主要负责参数校验、读写指针修改,以及写操作时的扩容计算。
除此之外,AbstractByteBuf 还提供了以下功能: 1. 操作索引:修改读写指针、mark & reset。 2. 重用缓冲区:discardReadBytes。 3. 丢弃部分数据:skipBytes。因为丢弃时,只需要修改读指针即可,与数据具体如何存储无关。
总结:在 AbstractByteBuf 中实现的是各个子类中通用的功能。
AbstractReferenceCountedByteBuf
从类名可以看出来,该类主要提供引用计数的功能,类似于 JVM 内存回收的对象引用计数器,用于跟踪对象的分配和回收,实现手动控制内存回收。
首先,我们看一下其成员变量: 1. refCnt:记录对象引用次数。 2. refCntUpdater:用于对 refCnt 进行原子更新。
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static { AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater = PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); } refCntUpdater = updater; }
private volatile int refCnt = 1;
接下来,我们看一下增加引用计数的 retain 方法。该方法是用 CAS 操作对 refCnt 进行加 1。另外,refCnt 值为 0 或 Integer.MAX_VALUE 值不能再操作,会抛出异常。
@Override public ByteBuf retain() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, 1); } if (refCnt == Integer.MAX_VALUE) { throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) { break; } } return this; }
另一个 release 方法表示释放资源,会将引用计数 refCnt 减 1,如果当前 refCnt 等于 1,减 1 之后等于 0,表示对象已经没有被引用,可以被回收了,会调用 deallocate 方法释放内存。
@Override public final boolean release() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, -1); }
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { if (refCnt == 1) { deallocate(); return true; } return false; } } }
在 UnpooledHeapByteBuf 中,释放内存仅仅是把 array 数组置为 null,剩下的内存回收工作交由 JVM 来完成。
// in UnpooledHeapByteBuf.java private byte[] array; @Override protected void deallocate() { array = null; }
在 UnpooledDirectByteBuf 中,则是调用 PlatformDependent.freeDirectBuffer 来释放直接内存。
// in UnpooledDirectByteBuf.java @Override protected void deallocate() { ByteBuffer buffer = this.buffer; if (buffer == null) { return; }
this.buffer = null;
if (!doNotFree) { freeDirect(buffer); } }
protected void freeDirect(ByteBuffer buffer) { PlatformDependent.freeDirectBuffer(buffer); }
UnpooledHeapByteBuf
UnpooledHeapByteBuf 是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池实现,意味着每次 I/O 读写都会创建一个新的 UnpooledHeapByteBuf 对象,频繁进行内存的分配和释放对性能会有一定的影响,但是相对堆外内存的申请和释放,成本稍低。
相比于 PooledHeapByteBuf,不需要自己管理内存池,不容易出现内存管理方面的问题,更容易使用和维护。因此,在满足性能的情况下,推荐使用 UnpooledHeapByteBuf。
首先看一下 UnpooledHeapByteBuf 的成员变量: 1. 负责内存分配的 ByteBufAllocator。 2. 缓冲区实现 byte 数组。 3. 从 ByteBuf 到 NIO 的 ByteBuffer 的转换对象 tmpNioBuf。
private final ByteBufAllocator alloc; private byte[] array; private ByteBuffer tmpNioBuf;
在将 AbstractByteBuf 的时候,我们提到 getBytes、capacity 等方法是由子类来实现的,这里我们先看看 getBytes 的实现,从代码中可以看出来,是直接调用 System.arraycopy 进行的数组复制。
@Override public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(array, index, dst, dstIndex, length); return this; }
接下来看一下动态伸缩的 capacity 方法,主要做了以下几件事: 1. 参数校验,newCapacity 不能小于 0,大于 maxCapacity。 2. 如果 maxCapacity 大于 oldCapacity 表示扩容,直接申请新的 byte 数组,进行内存复制即可。 3. 如果 maxCapacity 小于 oldCapacity 就是缩容了,同样申请 byte 数组。不同的是,需要根据读指针 readerIndex 与 newCapacity 的大小来决定是否需要进行内存复制。当 readerIndex 小于 newCapacity 时,需要复制内存,否则不需要。 4. 设置合适的读写指针位置。 5. 更新缓冲区字节数组引用 array 的值。
@Override public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); }
int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
private void setArray(byte[] initialArray) { array = initialArray; tmpNioBuf = null; }
public ByteBuf setIndex(int readerIndex, int writerIndex) { if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) { throw new IndexOutOfBoundsException(String.format( "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, capacity())); } this.readerIndex = readerIndex; this.writerIndex = writerIndex; return this; }
从 ByteBuf 到 ByteBuffer 的转换,主要是使用了 ByteBuffer 的 wrap 方法:
@Override public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); return ByteBuffer.wrap(array, index, length).slice(); }
PooledByteBuf
PooledByteBuf 是 ByteBuf 的内存池实现,应用自己实现的内存池管理策略,一般和操作系统的内存管理策略差不多,往往会更简单些。PooledByteBuf 内存池的分配和释放,主要通过 PoolArena 来实现。比如在 capacity 方法中,最终会使用 arena 的 reallocate 方法来重新分配内存。
public final ByteBuf capacity(int newCapacity) { ensureAccessible();
// If the request capacity does not require reallocation, just update the length of the memory. if (chunk.unpooled) { if (newCapacity == length) { return this; } } else { if (newCapacity > length) { if (newCapacity <= maxLength) { length = newCapacity; return this; } } else if (newCapacity < length) { if (newCapacity > maxLength >>> 1) { if (maxLength <= 512) { if (newCapacity > maxLength - 16) { length = newCapacity; setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); return this; } } else { // > 512 (i.e. >= 1024) length = newCapacity; setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); return this; } } } else { return this; } }
// 最终使用 arena 的 reallocate 方法来重新分配内存。 chunk.arena.reallocate(this, newCapacity, true); return this; }
PoolArena 是由多个 PoolChunk 组成的大块内存区域。
abstract class PoolArena<T> { static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder; final int pageSize; final int pageShifts; final int chunkSize; final int subpageOverflowMask; final int numSmallSubpagePools; private final PoolSubpage<T>[] tinySubpagePools; private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100; } // PoolChunkList 是 PoolChunk 组成的链表 final class PoolChunkList<T> { private final PoolArena<T> arena; private final PoolChunkList<T> nextList; PoolChunkList<T> prevList;
private final int minUsage; private final int maxUsage;
private PoolChunk<T> head; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 2
每个 PoolChunk 由多个 PoolSubpage 组成。
final class PoolChunk<T> {
final PoolArena<T> arena; final T memory; final boolean unpooled;
private final byte[] memoryMap; private final byte[] depthMap; private final PoolSubpage<T>[] subpages; /** Used to determine if the requested capacity is equal to or greater than pageSize. */ private final int subpageOverflowMask; private final int pageSize; private final int pageShifts; private final int maxOrder; private final int chunkSize; private final int log2ChunkSize; private final int maxSubpageAllocs; /** Used to mark memory as unusable */ private final byte unusable;
private int freeBytes; // 当前 chunk 空闲字节数目
PoolChunkList<T> parent; // 父节点 PoolChunk<T> prev; // 链表前一个节点 PoolChunk<T> next; // 链表后一个节点 }
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 PoolSubpage 负责管理一个 Page 的内存,通过 bitmap 中的每一位来标记每一块儿内存的占用状态。
final class PoolSubpage<T> { final PoolChunk<T> chunk; private final int memoryMapIdx; // 当前page在chunk中的id private final int runOffset; // 当前page在chunk.memory的偏移量 private final int pageSize; // page大小 private final long[] bitmap; // 通过对每一个二进制位的标记来修改一段内存的占用状态
PoolSubpage<T> prev; PoolSubpage<T> next;
boolean doNotDestroy; int elemSize; private int maxNumElems; private int bitmapLength; private int nextAvail; private int numAvail; }
PooledDirectByteBuf
PooledDirectByteBuf 基于内存池实现,与 UnPooledDirectByteBuf 的唯一区别就是,缓冲区的分配和销毁策略不同。不仅缓冲区所需内存使用内存池分配管理,PooledDirectByteBuf 对象本身,也使用 Recycler 管理。 比如 PooledDirectByteBuf 创建示例调用的是 Recycler 的 get 方法。
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) { return new PooledDirectByteBuf(handle, 0); } };
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); }
Recycler 是一个轻量级的对象池,一个对象池最核心的方法是从池中获取对象和回收对象到池中,分别对应其 get 和 recycle 方法。
public abstract class Recycler<T> { public final T get() { Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; } public final boolean recycle(T o, Handle<T> handle) { DefaultHandle<T> h = (DefaultHandle<T>) handle; if (h.stack.parent != this) { return false; }
h.recycle(o); return true; } }
PooledDirectByteBuf 中的 copy 方法用于复制一个新的字节缓冲区实例,该方法首先调用 PooledByteBufAllocator 的 directBuffer 来生成新的 ByteBuf,然后复制数据。
@Override public ByteBuf copy(int index, int length) { checkIndex(index, length); ByteBuf copy = alloc().directBuffer(length, maxCapacity()); copy.writeBytes(this, index, length); return copy; }
directBuffer 是在抽象类 AbstractByteBufAllocator 中实现的,进行参数校验之后调用 newDirectBuffer 来获取 ByteBuf,该方法由子类来实现。
@Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); }
在内存池版本 PooledByteBufAllocator 的实现中,判断如果内存池 directArena 可用,则从中获取,否则自行 new 一个。
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } }
return toLeakAwareBuffer(buf); }
而在非内存池版本 UnpooledByteBufAllocator 中,则是直接 new 一个。
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); }
return toLeakAwareBuffer(buf); }
辅助类
内存分配相关: ByteBufAllocator 及其子类 UnpooledByteBufAllocator、PooledByteBufAllocator。 组合视图:CompositeByteBuf。 工具类:ByteBufUtil。
|