A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始



  broker消息接收,假设接收的是一个普通消息(即没有事务),此处分析也只分析master上动作逻辑,不涉及ha。
  1. 如何找到消息接收处理入口
  可以通过broker的监听端口10911顺藤摸瓜式的找到
  NettyClientConfig.setListenPort-->BrokerStartup-->BrokerController-->NettyRemotingServer
  com.alibaba.rocketmq.remoting.netty.NettyDecoder
  com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)
  也可以顺着broker的启动脚本找到
  BrokerStartup-->BrokerController-->NettyRemotingServer
  因为rocketmq连接层默认使用netty开发,如果熟悉netty的话,
  可以直接查找ChannelInitializer或者pipeline().addLast等
  2. 调试NettyDecoder
  com.alibaba.rocketmq.remoting.netty.NettyDecoder.decode(ChannelHandlerContext, ByteBuf)
  会调用RemotingCommand.decode(byteBuffer)组装RemotingCommand
  在com.alibaba.rocketmq.remoting.protocol.RemotingCommand.decode(ByteBuffer) 中断点
  排除如下code
  cmd.getCode() != 0 && cmd.getCode() != 34 && cmd.getCode() != 15
  && cmd.getCode() != 38 && cmd.getCode() != 11
  用producer的client发一条消息到broker
  发现RemotingCommand.decode解析出来的RemotingCommand的code为310
  310对应RequestCode.SEND_MESSAGE_V2 发送消息
  此时处理线程是:Thread [NettyServerWorkerThread_2]
  RemotingCommand主要描述了操作code、opaque号、body即消息体。
  3. 调试NettyServerHandler
  对com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)断点
  断点条件属性是msg.getCode() == 310
  此时处理线程是:Thread [NettyServerWorkerThread_2]
  4. NettyRemotingAbstract
  com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand)
  方法负责分发给相应的NettyRequestProcessor实例
  负责处理的是com.alibaba.rocketmq.broker.processor.SendMessageProcessor@17fb0278
  处理线程是:Thread [SendMessageThread_1],跟上面的处理已经不在一个线程上了,异步
  5. SendMessageProcessor
  SendMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)介入处理
  构建SendMessageRequestHeader
  进入SendMessageProcessor.sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
  int queueIdInt = requestHeader.getQueueId(); 作用是什么 什么时候确定的id
  构建MessageExtBrokerInner实例 tags转tagsCode 就是tags的String的hashcode
  PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
  交给MessageStore进行put(store,进行存放)
  6. DefaultMessageStore
  com.alibaba.rocketmq.store.DefaultMessageStore.putMessage(MessageExtBrokerInner)
  消息topic的长度校验 不能超过127
  消息properties转成字符串后,长度不能超过32767
  下面交个CommitLog进行putMessage
  7. CommitLog
  com.alibaba.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)
  设置存储时间
  设置消息体CRC校验值
  交给MapedFile进行存储mapedFile.appendMessage(msg, this.appendMessageCallback)
  8. MapedFile
  mapedFile.appendMessage(msg, this.appendMessageCallback)
  获取当前文件已经写到什么位置了
  文件已经做了map map到mappedByteBuffer 讲当前的mappedByteBuffer 割出来并设置position为当前写位置
  交由callback做append
  callback是CommitLog$DefaultAppendMessageCallback
  传给callback的参数有
  * 1. 这个文件的offset,意思是:一个broker要存储很多消息,那么一个文件肯定不够存,当存到第二个文件的时候,这个文件里的第一条消息相对于整个broker中的消息有个offset,此值是文件名。
  * 2. 从store文件映射的buffer中割出来的byteBuffer
  * 3. 文件的剩余空间
  * 4. 消息体
  8.1 callback中append逻辑:
  计算wroteOffset 即整个broker中的offset, 代码注释中称之为物理offset,计算逻辑也很简单即上面参数1+这条消息在这个文件中的position
  计算消息id,逻辑使用主机物理IP加上一步的wroteOffset计算,用了ByteBuffer处理,比较高效,待完善验证用例。
  获取这个topic的这个队列的offset(一个topic写多个队列),这个offset有啥用?估计是用于快速查找
  将消息bean对象用msgStoreItemMemory进行以约定消息体的形式进行byte的put
  将msgStoreItemMemory put到上面传进来的参数2的buffer 即写入磁盘
  构建append结果,没啥逻辑,主要是一些信息 AppendMessageResult
  将消息在这个队列中的offset加1
  更新MapedFile的wrotePostion,就是将原来的wrotePostion加上这次写入的字节数
  此时订阅断已经收到消息,(应该更早,待确认,待分析消息如何送到consume的)
  8.2 回CommitLog--:
  构建 PutMessageResult对象。
  8.3 处理同异步刷盘逻辑
  同步刷盘 用GroupCommitService, 其中使用了多条消息一起刷的设计,并且设计了刷盘如果超时的异常场景的反馈
  异步刷盘 用CommitLog$FlushRealTimeService(是一个线程,此时会叫醒他)
  8.4 处理主从双机同异步同步逻辑
  同步形式同步从节点 交由HAService 实时同步,并等待同步结果。
  异步不用管。
  9. 收集统计数据
  耗时等等
  10. 对非oneway的消息做response处理

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马