A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

Netty 权威指南笔记(三):TCP 粘包和拆包什么是 TCP 粘包和拆包?

TCP 是一个“流”协议,所谓“流”就是没有界限的一串数据。大家可以想像河流里的水,期间并没有分界线。TCP 底层并不了解上层业务数据的具体含义,它会根据 TCP 缓冲区的实际情况进行包的划分。所以,在业务上,一个完整的包可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包,封装成一个大的数据包发送,这就是所谓的 TCP 粘包和拆包问题。

为什么会发生粘包和拆包?

主要是应用程序写入数据大小、缓冲区大小、TCP/IP 最大报文大小、以太网帧 payload 大小不一致造成。

  • 应用程序一次写入的字节数大于发送缓冲区大小。
  • 进行 TSS 大小的 TCP 分段。
  • 以太网帧的 payload 大于 MTU 进行 IP 分片。
  • 应用程序多次写入少量数据,导致粘包。
解决策略

由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决。根据业界的主流协议的解决方案,归纳如下:

  • 消息定长。
  • 包尾增加分隔符,比如 FTP 协议使用回车换行符进行分割。
  • 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,这正是 TCP/UDP/IP 报文采用的方案。
  • 更复杂的应用层协议设计。

在 Netty 中,有处理定长消息的 FixedLengthFrameDecoder、回车换行符分隔的 LineBasedFrameDecoder、特殊分隔符的 DelimiterBasedFrameDecoder,以及处理特殊协议的 Decoder,比如处理 HTTP 协议的 HttpRequestDecoder、HttpResponseDecoder 等。

下面,我们以定长消息解码器 FixedLengthFrameDecoder 为例,分析源码,学习一下其工作原理。

FixedLengthFrameDecoder 源码分析

从下面的类图中可以看出来,这些 Decoder 都继承自 ByteToMessageDecoder、ChannelHandlerAdapter,实现了 ChannelHandler 接口。回顾之前使用 Netty 开发的 TimeServer 程序中,TimeServerHandler 也是继承自 ChannelHandlerAdapter。

ChannelHandler 接口中,与读取数据相关的主要是 channelRead 方法。


public interface ChannelHandler {

    // 读取数据

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    // 读取数据完成

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

}


下面我们看一下 ByteToMessageDecoder 中实现的 channelRead 方法:


    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 只处理 ByteBuf 类型数据

        if (msg instanceof ByteBuf) {

            // RecyclableArrayList 是一个可循环使用的 ArrayList,使用它是为了减少 GC

            RecyclableArrayList out = RecyclableArrayList.newInstance();

            try {

                ByteBuf data = (ByteBuf) msg;

                // cumulation 是上次处理数据后遗留的半包数据

                first = cumulation == null;

                if (first) {

                    cumulation = data;

                } else {

                    // 上次遗留数据和本次数据进行合并

                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);

                }

                // 对数据进行解码,解码成功的数据存入 out,半包数据赋值给 cumulation

                callDecode(ctx, cumulation, out);

            } catch (DecoderException e) {

                throw e;

            } catch (Throwable t) {

                throw new DecoderException(t);

            } finally {

                if (cumulation != null && !cumulation.isReadable()) {

                    cumulation.release();

                    cumulation = null;

                }

                int size = out.size();


                // 如果 out.size 大于 0,表示有解码成功的数据,发送到下一个 ChannelAdapter 进行处理

                for (int i = 0; i < size; i ++) {

                    ctx.fireChannelRead(out.get(i));

                }

                out.recycle();

            }

        } else {

            ctx.fireChannelRead(msg);

        }

    }


    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

        try {

            while (in.isReadable()) {

                int outSize = out.size();

                int oldInputLength = in.readableBytes();

                // 解析数据,并存入 out 中

                decode(ctx, in, out);


                // Check if this handler was removed before continuing the loop.

                // If it was removed, it is not safe to continue to operate on the buffer.

                if (ctx.isRemoved()) {

                    break;

                }


                if (outSize == out.size()) {

                    if (oldInputLength == in.readableBytes()) {

                        break;

                    } else {

                        continue;

                    }

                }

                // 如果成功解码出数据,但是 ByteBuf 中数据长度不变,可能会导致死循环

                if (oldInputLength == in.readableBytes()) {

                    throw new DecoderException(

                            StringUtil.simpleClassName(getClass()) +

                            ".decode() did not read anything but decoded a message.");

                }


                if (isSingleDecode()) {

                    break;

                }

            }

        } catch (DecoderException e) {

            throw e;

        } catch (Throwable cause) {

            throw new DecoderException(cause);

        }

    }


    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;


首先判断输入数据是否是 ByteBuf 类型,是则处理,否则传递下去。

然后初始化一个 RecyclableArrayList 用来保存解析成功的数据片。

将上次解析遗留的数据 cumulation 和本次到来的数据进行合并。

调用 decode 方法循环解析合并后的数据,存入列表 out。

如果列表 out 中有解析成功的数据,则调用 fireChannelRead 方法发送给下一个 ChannelHandler 处理。

decode 方法是一个抽象方法,由子类 FixedLengthFrameDecoder 负责实现,具体源码如下所示。其原理是,如果输入数据 ByteBuf 长度超过 frameLength,则截取前 frameLength 字节数据为一个新的 ByteBuf 数据分片,存入列表 out 中。


public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength;


    public FixedLengthFrameDecoder(int frameLength) {

        if (frameLength <= 0) {

            throw new IllegalArgumentException(

                    "frameLength must be a positive integer: " + frameLength);

        }

        this.frameLength = frameLength;

    }


    @Override

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        Object decoded = decode(ctx, in);

        if (decoded != null) {

            // 读取固定长度 frameLength 的数据分片,存入列表 out 中

            out.add(decoded);

        }

    }

    protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        // 如果小于长度 frameLength,则返回 null

        if (in.readableBytes() < frameLength) {

            return null;

        } else {

            // 否则,读取长度为 frameLength 的数据,为一个 ByteBuf 数据分片

            return in.readSlice(frameLength).retain();

        }

    }

}


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马