概要划分
一个最基本的网络协议必须包含
数据的长度
数据
了解 TCP 协议的同学一定听说过粘包、拆包 这两个术语。因为TCP协议是数据流协议,它的底层根据二进制缓冲区的实际情况进行包的划分。所以,不可避免的会出现粘包,拆包 现象 。为了解决它们,我们的网络协议往往会使用一个 4 字节的 int 类型来表示数据的大小。比如,Netty 就为我们提供了 LengthFieldBasedFrameDecoder 解码器,它可以有效的使用自定义长度帧来解决上述问题。
同时一个好的网络协议,还会将动作和业务数据分离。试想一下, HTTP 协议的分为请求头,请求体——
请求头:定义了接口地址、Http Method、HTTP 版本
请求体:定义了需要传递的数据
这就是一种分离关注点的思想。所以自定义的网络协议也可以包含:
动作指令:比如定义 code 来分门别类的代表不同的业务逻辑
序列化算法:描述了 JAVA 对象和二进制之间转换的形式,提供多种序列化/反序列化方式。比如 json、protobuf 等等,甚至是自定义算法。比如:rocketmq 等等。
同时,协议的开头可以定义一个约定的魔数。这个固定值(4字节),一般用来判断当前的数据包是否合法。比如,当我们使用 telnet 发送错误的数据包时,很显然,它不合法,会导致解码失败。所以,为了减轻服务器的压力,我们可以取出数据包的前4个字节与固定的魔数对比,如果是非法的格式,直接关闭连接,不继续解码。
网络协议结构如下所示:
+--------------+-----------+------------+-----------+----------+
| 魔数(4) | code(1) |序列化算法(1) |数据长度(4) |数据(n) |
+--------------+-----------+------------+-----------+----------+
回到顶部
RocketMQ 通信网络协议的实现
RocketMQ 网络协议
这一小节,我们从RocketMQ 中,分析优秀通信网络协议的实现。RocketMQ 项目中,客户端和服务端的通信是基于 Netty 之上构建的。同时,为了更加有效的通信,往往需要对发送的消息自定义网络协议。
RocketMQ 的网络协议,从数据分类的角度上看,可分为两大类
消息头数据(Header Data)
消息体数据(Body Data)
从左到右
第一段:4 个字节整数,等于2、3、4 长度总和
第二段:4 个字节整数,等于3 的长度。特别的 byte[0] 代表序列化算法,byte[1~3]才是真正的长度
第三段:代表消息头数据,结构如下
{
"code":0,
"language":"JAVA",
"version":0,
"opaque":0,
"flag":1,
"remark":"hello, I am respponse /127.0.0.1:27603",
"extFields":{
"count":"0",
"messageTitle":"HelloMessageTitle"
}
}
第四段:代表消息体数据
RocketMQ 消息头协议详细如下:
Header 字段名 类型 Request Response
code 整数 请求操作代码,请求接收方根据不同的代码做不同的操作 应答结果代码,0表示成功,非0表示各种错误代码
language 字符串 请求发起方实现语言,默认JAVA 应答接收方实现语言
version 整数 请求发起方程序版本 应答接收方程序版本
opaque 整数 请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用 应答方不做修改,直接返回
flag 整数 通信层的标志位 通信层的标志位
remark 字符串 传输自定义文本信息 错误详细描述信息
extFields HashMap<String,String> 请求自定义字段 应答自定义字段
编码过程
RocketMQ 的通信模块是基于 Netty的。通过定义 NettyEncoder 来实现对每一个 Channel的 出栈数据进行编码,如下所示:
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
...
}
}
}
其中,核心的编码过程位于 RemotingCommand 对象中,encodeHeader 阶段,需要统计出消息总长度,即:
定义消息头长度,一个整数表示:占4个字节
定义消息头数据,并计算其长度
定义消息体数据,并计算其长度
额外再加 4是因为需要加入消息总长度,一个整数表示:占4个字节
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> 消息头长度,一个整数表示:占4个字节
int length = 4;
// 2> 消息头数据
byte[] headerData;
headerData = this.headerEncode();
// 再加消息头数据长度
length += headerData.length;
// 3> 再加消息体数据长度
length += bodyLength;
// 4> 额外加 4是因为需要加入消息总长度,一个整数表示:占4个字节
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 5> 将消息总长度加入 ByteBuffer
result.putInt(length);
// 6> 将消息的头长度加入 ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 7> 将消息头数据加入 ByteBuffer
result.put(headerData);
result.flip();
return result;
}
其中,encode 阶段会将 CommandCustomHeader 数据转换 HashMap<String,String>,方便序列化
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
特别的,消息头序列化支持两种算法:
JSON
RocketMQ
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
这儿需要值得注意的是,encode阶段将当前 RPC 类型和 headerData长度编码到一个 byte[4] 数组中,byte[0] 位序列化类型。
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
其中,通过与运算 & 0xFF 取低八位数据。
所以, 最终 length 长度等于序列化类型 + header length + header data + body data 的字节的长度。
解码过程
RocketMQ 解码通过NettyDecoder来实现,它继承自 LengthFieldBasedFrameDecoder,其中调用了父类LengthFieldBasedFrameDecoder的构造函数
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
这些参数设置4个字节代表 length总长度,同时解码时跳过最开始的4个字节:
frame = (ByteBuf) super.decode(ctx, in);
所以,得到的 frame= 序列化类型 + header length + header data + body data 。解码如下所示:
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//总长度
int length = byteBuffer.limit();
//原始的 header length,4位
int oriHeaderLen = byteBuffer.getInt();
//真正的 header data 长度。忽略 byte[0]的 serializeType
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
其中,getProtocolType,右移 24位,拿到 serializeType:
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
getHeaderLength 拿到 0-24 位代表的 headerData length:
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
|
|