A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

功能介绍

Java 本身提供了 ByteBuffer 类,为什么 Netty 还要搞一个 ByteBuf 类呢?因为 ByteBuffer 类有着许多缺点:

  • ByteBuffer 长度固定,无法动态伸缩。
  • ByteBuffer 只有一个位置指针 position,读写的时候需要手工调用 flip 和 rewind 方法进行模式转换,操作繁琐,容易出错。
  • 功能太少,缺少一些高级特性。

为了弥补这些不足,Netty 提供了自己的缓冲区类实现 ByteBuf。有什么特点呢?

  • 两个位置指针协助缓冲区的读写操作:readerIndex、writerIndex,读写之间不需要调整指针位置,大大简化了读写操作。
  • 可以动态扩容。
  • 当有部分内容已经读取完成时,可以通过 discard 操作对缓冲区进行整理,在不重新申请内存的情况下,增大可写字节数目。
  • 支持标记和回滚的功能。
  • 支持在 ByteBuf 中查找某个字符串。
  • 派生出另一个 ByteBuf:duplicate、copy、slice。
  • 转化成标准的 ByteBuffer,这是因为在使用 NIO 进行网络读写时,操作的对象还是 JDK 标准的 ByteBuffer。
  • 随机读写。
源码分析继承关系

ByteBuf 的主要功能类继承关系如下图所示:

从内存分配的角度看,ByteBuf 可以分为两类:
1. 堆内存字节缓冲区 HeapByteBuf:优点是内存分配和回收速度快,可以被 JVM 自动回收。缺点是,如果进行 Socket 的 I/O 读写,需要额外做一次内存复制,在堆内存缓冲区和内核 Channel 之间进行复制,性能会有一定程度下降。
2. 直接内存字节缓冲区 DirectByteBuf:非堆内存,直接在堆外进行分配。相比于堆内存,内存分配和回收稍慢,但是可以减少复制,提升性能。

两种内存,各有利弊。Netty 最佳实践表明:在 I/O 通信线程的读写缓冲区使用 DirectByteBuf,后端业务消息的编解码模块使用 HeapByteBuf,这样组合可以达到性能最优。

从内存回收的角度看,ByteBuf 也分为两类:基于对象池的 ByteBuf 和普通 ByteBuf。两者区别在于基于对象池的 ByteBuf 可以重用 ByteBuf 对象,它自己维护了一个内存池,可以循环利用创建的 ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁 GC。内存池的缺点是管理和维护比较复杂,使用时需要更加谨慎。

下面我们对一些关键类进行分析和解读。

AbstractByteBuf

AbstractByteBuf 都做了哪些事儿呢?我们先看一下其主要的成员变量:
1. 读写指针。
2. 用于标记回滚的 marked 读写指针。
3. 最大容量 maxCapacity,用于进行内存保护。
4. 与本 ByteBuf 大小端属性相反的 ByteBuf:SwappedByteBuf。

我们发现这里没有真正存储数据的数据结构,例如 byte 数组或 DirectByteBuffer,原因是这里还不知道子类是要基于堆内存还是直接内存。


public abstract class AbstractByteBuf extends ByteBuf {

    static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);


    int readerIndex;

    int writerIndex;

    private int markedReaderIndex;

    private int markedWriterIndex;


    private int maxCapacity;


    private SwappedByteBuf swappedBuf;

}



接下来我们看看读操作 readBytes 方法,AbstractByteBuf 类做了什么呢?

1. 在 checkReadableBytes 方法中,检查入参有效性。

2. 修改读指针 readerIndex。


    @Override

    public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {

        checkReadableBytes(length);

        // getBytes 方法未在 AbstractByteBuf 中实现

        getBytes(readerIndex, dst, dstIndex, length);

        readerIndex += length;

        return this;

    }


    protected final void checkReadableBytes(int minimumReadableBytes) {

        ensureAccessible();

        if (minimumReadableBytes < 0) {

            throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");

        }

        if (readerIndex > writerIndex - minimumReadableBytes) {

            throw new IndexOutOfBoundsException(String.format(

                    "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",

                    readerIndex, minimumReadableBytes, writerIndex, this));

        }

    }



从当前 ByteBuf 中复制数据到 dst 是在 getBytes 方法中,该方法未在 AbstractByteBuf 中实现,也是因为此时具体如何存储数据尚不确定。


下面我们看一下写操作 writeBytes 方法,AbstractByteBuf 负责实现了哪些操作呢?

1. 有效性检查,如果引用计数 refCnt 为 0,表示该 ByteBuf 已经被回收,不能再写入。

2. 输入参数有效性检查:要写入的数据量不能小于 0,写入之后总数据量也不能大于最大容量。

3. 当容量不足时,如果尚未超过最大容量,则进行扩容。

4. 修改写指针。


    @Override

    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {

        ensureAccessible();

        ensureWritable(length);

        // setBytes 交给子类实现。

        setBytes(writerIndex, src, srcIndex, length);

        writerIndex += length;

        return this;

    }


    protected final void ensureAccessible() {

        if (refCnt() == 0) {

            throw new IllegalReferenceCountException(0);

        }

    }


    @Override

    public ByteBuf ensureWritable(int minWritableBytes) {

        if (minWritableBytes < 0) {

            throw new IllegalArgumentException(String.format(

                    "minWritableBytes: %d (expected: >= 0)", minWritableBytes));

        }


        if (minWritableBytes <= writableBytes()) {

            return this;

        }


        if (minWritableBytes > maxCapacity - writerIndex) {

            throw new IndexOutOfBoundsException(String.format(

                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",

                    writerIndex, minWritableBytes, maxCapacity, this));

        }


        // Normalize the current capacity to the power of 2.

        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);


        // 具体扩容操作由子类实现。

        capacity(newCapacity);

        return this;

    }   


在读写操作中,AbstractByteBuf 主要负责参数校验、读写指针修改,以及写操作时的扩容计算。


除此之外,AbstractByteBuf 还提供了以下功能:

1. 操作索引:修改读写指针、mark & reset。

2. 重用缓冲区:discardReadBytes。

3. 丢弃部分数据:skipBytes。因为丢弃时,只需要修改读指针即可,与数据具体如何存储无关。


总结:在 AbstractByteBuf 中实现的是各个子类中通用的功能。


AbstractReferenceCountedByteBuf


从类名可以看出来,该类主要提供引用计数的功能,类似于 JVM 内存回收的对象引用计数器,用于跟踪对象的分配和回收,实现手动控制内存回收。


首先,我们看一下其成员变量:

1. refCnt:记录对象引用次数。

2. refCntUpdater:用于对 refCnt 进行原子更新。


    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;


    static {

        AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =

                PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

        if (updater == null) {

            updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

        }

        refCntUpdater = updater;

    }


    private volatile int refCnt = 1;


接下来,我们看一下增加引用计数的 retain 方法。该方法是用 CAS 操作对 refCnt 进行加 1。另外,refCnt 值为 0 或 Integer.MAX_VALUE 值不能再操作,会抛出异常。


    @Override

    public ByteBuf retain() {

        for (;;) {

            int refCnt = this.refCnt;

            if (refCnt == 0) {

                throw new IllegalReferenceCountException(0, 1);

            }

            if (refCnt == Integer.MAX_VALUE) {

                throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);

            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {

                break;

            }

        }

        return this;

    }


另一个 release 方法表示释放资源,会将引用计数 refCnt 减 1,如果当前 refCnt 等于 1,减 1 之后等于 0,表示对象已经没有被引用,可以被回收了,会调用 deallocate 方法释放内存。


    @Override

    public final boolean release() {

        for (;;) {

            int refCnt = this.refCnt;

            if (refCnt == 0) {

                throw new IllegalReferenceCountException(0, -1);

            }


            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {

                if (refCnt == 1) {

                    deallocate();

                    return true;

                }

                return false;

            }

        }

    }


在 UnpooledHeapByteBuf 中,释放内存仅仅是把 array 数组置为 null,剩下的内存回收工作交由 JVM 来完成。


    // in UnpooledHeapByteBuf.java

    private byte[] array;

    @Override

    protected void deallocate() {

        array = null;

    }


在 UnpooledDirectByteBuf 中,则是调用 PlatformDependent.freeDirectBuffer 来释放直接内存。


    // in UnpooledDirectByteBuf.java

    @Override

    protected void deallocate() {

        ByteBuffer buffer = this.buffer;

        if (buffer == null) {

            return;

        }


        this.buffer = null;


        if (!doNotFree) {

            freeDirect(buffer);

        }

    }


    protected void freeDirect(ByteBuffer buffer) {

        PlatformDependent.freeDirectBuffer(buffer);

    }   



UnpooledHeapByteBuf


UnpooledHeapByteBuf 是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池实现,意味着每次 I/O 读写都会创建一个新的 UnpooledHeapByteBuf 对象,频繁进行内存的分配和释放对性能会有一定的影响,但是相对堆外内存的申请和释放,成本稍低。


相比于 PooledHeapByteBuf,不需要自己管理内存池,不容易出现内存管理方面的问题,更容易使用和维护。因此,在满足性能的情况下,推荐使用 UnpooledHeapByteBuf。


首先看一下 UnpooledHeapByteBuf 的成员变量:

1. 负责内存分配的 ByteBufAllocator。

2. 缓冲区实现 byte 数组。

3. 从 ByteBuf 到 NIO 的 ByteBuffer 的转换对象 tmpNioBuf。


    private final ByteBufAllocator alloc;

    private byte[] array;

    private ByteBuffer tmpNioBuf;


在将 AbstractByteBuf 的时候,我们提到 getBytes、capacity 等方法是由子类来实现的,这里我们先看看 getBytes 的实现,从代码中可以看出来,是直接调用 System.arraycopy 进行的数组复制。


    @Override

    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {

        checkDstIndex(index, length, dstIndex, dst.length);

        System.arraycopy(array, index, dst, dstIndex, length);

        return this;

    }


接下来看一下动态伸缩的 capacity 方法,主要做了以下几件事:

1. 参数校验,newCapacity 不能小于 0,大于 maxCapacity。

2. 如果 maxCapacity 大于 oldCapacity 表示扩容,直接申请新的 byte 数组,进行内存复制即可。

3. 如果 maxCapacity 小于 oldCapacity 就是缩容了,同样申请 byte 数组。不同的是,需要根据读指针 readerIndex 与 newCapacity 的大小来决定是否需要进行内存复制。当 readerIndex 小于 newCapacity 时,需要复制内存,否则不需要。

4. 设置合适的读写指针位置。

5. 更新缓冲区字节数组引用 array 的值。


    @Override

    public ByteBuf capacity(int newCapacity) {

        ensureAccessible();

        if (newCapacity < 0 || newCapacity > maxCapacity()) {

            throw new IllegalArgumentException("newCapacity: " + newCapacity);

        }


        int oldCapacity = array.length;

        if (newCapacity > oldCapacity) {

            byte[] newArray = new byte[newCapacity];

            System.arraycopy(array, 0, newArray, 0, array.length);

            setArray(newArray);

        } else if (newCapacity < oldCapacity) {

            byte[] newArray = new byte[newCapacity];

            int readerIndex = readerIndex();

            if (readerIndex < newCapacity) {

                int writerIndex = writerIndex();

                if (writerIndex > newCapacity) {

                    writerIndex(writerIndex = newCapacity);

                }

                System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);

            } else {

                setIndex(newCapacity, newCapacity);

            }

            setArray(newArray);

        }

        return this;

    }


    private void setArray(byte[] initialArray) {

        array = initialArray;

        tmpNioBuf = null;

    }


    public ByteBuf setIndex(int readerIndex, int writerIndex) {

        if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {

            throw new IndexOutOfBoundsException(String.format(

                    "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",

                    readerIndex, writerIndex, capacity()));

        }

        this.readerIndex = readerIndex;

        this.writerIndex = writerIndex;

        return this;

    }





从 ByteBuf 到 ByteBuffer 的转换,主要是使用了 ByteBuffer 的 wrap 方法:


    @Override

    public ByteBuffer nioBuffer(int index, int length) {

        ensureAccessible();

        return ByteBuffer.wrap(array, index, length).slice();

    }


PooledByteBuf


PooledByteBuf 是 ByteBuf 的内存池实现,应用自己实现的内存池管理策略,一般和操作系统的内存管理策略差不多,往往会更简单些。PooledByteBuf 内存池的分配和释放,主要通过 PoolArena 来实现。比如在 capacity 方法中,最终会使用 arena 的 reallocate 方法来重新分配内存。


    public final ByteBuf capacity(int newCapacity) {

        ensureAccessible();


        // If the request capacity does not require reallocation, just update the length of the memory.

        if (chunk.unpooled) {

            if (newCapacity == length) {

                return this;

            }

        } else {

            if (newCapacity > length) {

                if (newCapacity <= maxLength) {

                    length = newCapacity;

                    return this;

                }

            } else if (newCapacity < length) {

                if (newCapacity > maxLength >>> 1) {

                    if (maxLength <= 512) {

                        if (newCapacity > maxLength - 16) {

                            length = newCapacity;

                            setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));

                            return this;

                        }

                    } else { // > 512 (i.e. >= 1024)

                        length = newCapacity;

                        setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));

                        return this;

                    }

                }

            } else {

                return this;

            }

        }


        // 最终使用 arena 的 reallocate 方法来重新分配内存。

        chunk.arena.reallocate(this, newCapacity, true);

        return this;

    }


PoolArena 是由多个 PoolChunk 组成的大块内存区域。


abstract class PoolArena<T> {

    static final int numTinySubpagePools = 512 >>> 4;


    final PooledByteBufAllocator parent;


    private final int maxOrder;

    final int pageSize;

    final int pageShifts;

    final int chunkSize;

    final int subpageOverflowMask;

    final int numSmallSubpagePools;

    private final PoolSubpage<T>[] tinySubpagePools;

    private final PoolSubpage<T>[] smallSubpagePools;


    private final PoolChunkList<T> q050;

    private final PoolChunkList<T> q025;

    private final PoolChunkList<T> q000;

    private final PoolChunkList<T> qInit;

    private final PoolChunkList<T> q075;

    private final PoolChunkList<T> q100;

}   

// PoolChunkList 是 PoolChunk 组成的链表

final class PoolChunkList<T> {

    private final PoolArena<T> arena;

    private final PoolChunkList<T> nextList;

    PoolChunkList<T> prevList;


    private final int minUsage;

    private final int maxUsage;


    private PoolChunk<T> head;

}   

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

2


每个 PoolChunk 由多个 PoolSubpage 组成。


final class PoolChunk<T> {


    final PoolArena<T> arena;

    final T memory;

    final boolean unpooled;


    private final byte[] memoryMap;

    private final byte[] depthMap;

    private final PoolSubpage<T>[] subpages;

    /** Used to determine if the requested capacity is equal to or greater than pageSize. */

    private final int subpageOverflowMask;

    private final int pageSize;

    private final int pageShifts;

    private final int maxOrder;

    private final int chunkSize;

    private final int log2ChunkSize;

    private final int maxSubpageAllocs;

    /** Used to mark memory as unusable */

    private final byte unusable;


    private int freeBytes;      // 当前 chunk 空闲字节数目


    PoolChunkList<T> parent;    // 父节点

    PoolChunk<T> prev;          // 链表前一个节点

    PoolChunk<T> next;          // 链表后一个节点

}


2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

PoolSubpage 负责管理一个 Page 的内存,通过 bitmap 中的每一位来标记每一块儿内存的占用状态。


final class PoolSubpage<T> {

    final PoolChunk<T> chunk;

    private final int memoryMapIdx; // 当前page在chunk中的id

    private final int runOffset;    // 当前page在chunk.memory的偏移量

    private final int pageSize;     // page大小

    private final long[] bitmap;    // 通过对每一个二进制位的标记来修改一段内存的占用状态


    PoolSubpage<T> prev;

    PoolSubpage<T> next;


    boolean doNotDestroy;

    int elemSize;

    private int maxNumElems;

    private int bitmapLength;

    private int nextAvail;

    private int numAvail;

}




PooledDirectByteBuf


PooledDirectByteBuf 基于内存池实现,与 UnPooledDirectByteBuf 的唯一区别就是,缓冲区的分配和销毁策略不同。不仅缓冲区所需内存使用内存池分配管理,PooledDirectByteBuf 对象本身,也使用 Recycler 管理。 比如 PooledDirectByteBuf 创建示例调用的是 Recycler 的 get 方法。


final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {


    private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {

        @Override

        protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {

            return new PooledDirectByteBuf(handle, 0);

        }

    };


    static PooledDirectByteBuf newInstance(int maxCapacity) {

        PooledDirectByteBuf buf = RECYCLER.get();

        buf.setRefCnt(1);

        buf.maxCapacity(maxCapacity);

}   




Recycler 是一个轻量级的对象池,一个对象池最核心的方法是从池中获取对象和回收对象到池中,分别对应其 get 和 recycle 方法。


public abstract class Recycler<T> {

    public final T get() {

        Stack<T> stack = threadLocal.get();

        DefaultHandle<T> handle = stack.pop();

        if (handle == null) {

            handle = stack.newHandle();

            handle.value = newObject(handle);

        }

        return (T) handle.value;

    }   

    public final boolean recycle(T o, Handle<T> handle) {

        DefaultHandle<T> h = (DefaultHandle<T>) handle;

        if (h.stack.parent != this) {

            return false;

        }


        h.recycle(o);

        return true;

    }

}



PooledDirectByteBuf 中的 copy 方法用于复制一个新的字节缓冲区实例,该方法首先调用 PooledByteBufAllocator 的 directBuffer 来生成新的 ByteBuf,然后复制数据。


    @Override

    public ByteBuf copy(int index, int length) {

        checkIndex(index, length);

        ByteBuf copy = alloc().directBuffer(length, maxCapacity());

        copy.writeBytes(this, index, length);

        return copy;

    }


directBuffer 是在抽象类 AbstractByteBufAllocator 中实现的,进行参数校验之后调用 newDirectBuffer 来获取 ByteBuf,该方法由子类来实现。


    @Override

    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {

        if (initialCapacity == 0 && maxCapacity == 0) {

            return emptyBuf;

        }

        validate(initialCapacity, maxCapacity);

        return newDirectBuffer(initialCapacity, maxCapacity);

    }



在内存池版本 PooledByteBufAllocator 的实现中,判断如果内存池 directArena 可用,则从中获取,否则自行 new 一个。


    @Override

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {

        PoolThreadCache cache = threadCache.get();

        PoolArena<ByteBuffer> directArena = cache.directArena;


        ByteBuf buf;

        if (directArena != null) {

            buf = directArena.allocate(cache, initialCapacity, maxCapacity);

        } else {

            if (PlatformDependent.hasUnsafe()) {

                buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);

            } else {

                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);

            }

        }


        return toLeakAwareBuffer(buf);

    }


而在非内存池版本 UnpooledByteBufAllocator 中,则是直接 new 一个。


    @Override

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {

        ByteBuf buf;

        if (PlatformDependent.hasUnsafe()) {

            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);

        } else {

            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);

        }


        return toLeakAwareBuffer(buf);

    }


辅助类


内存分配相关: ByteBufAllocator 及其子类 UnpooledByteBufAllocator、PooledByteBufAllocator。

组合视图:CompositeByteBuf。

工具类:ByteBufUtil。


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马