A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

1、概述
2、Producer 发送消息
DefaultMQProducer#send(Message)1: @Override2: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {3:     return this.defaultMQProducerImpl.send(msg);4: }
  • 说明:发送同步消息,DefaultMQProducer#send(Message) 对DefaultMQProducerImpl#send(Message) 进行封装。
DefaultMQProducerImpl#sendDefaultImpl()  1: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  2:     return send(msg, this.defaultMQProducer.getSendMsgTimeout());  3: }  4:   5: public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  6:     return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);  7: }  8:   9: private SendResult sendDefaultImpl(// 10:     Message msg, // 11:     final CommunicationMode communicationMode, // 12:     final SendCallback sendCallback, // 13:     final long timeout// 14: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 15:     // 校验 Producer 处于运行状态 16:     this.makeSureStateOK(); 17:     // 校验消息格式 18:     Validators.checkMessage(msg, this.defaultMQProducer); 19:     // 20:     final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息 21:     long beginTimestampFirst = System.currentTimeMillis(); 22:     long beginTimestampPrev = beginTimestampFirst; 23:     long endTimestamp = beginTimestampFirst; 24:     // 获取 Topic路由信息 25:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 26:     if (topicPublishInfo != null && topicPublishInfo.ok()) { 27:         MessageQueue mq = null; // 最后选择消息要发送到的队列 28:         Exception exception = null; 29:         SendResult sendResult = null; // 最后一次发送结果 30:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用 31:         int times = 0; // 第几次发送 32:         String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名 33:         // 循环调用发送消息,直到成功 34:         for (; times < timesTotal; times++) { 35:             String lastBrokerName = null == mq ? null : mq.getBrokerName(); 36:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列 37:             if (tmpmq != null) { 38:                 mq = tmpmq; 39:                 brokersSent[times] = mq.getBrokerName(); 40:                 try { 41:                     beginTimestampPrev = System.currentTimeMillis(); 42:                     // 调用发送消息核心方法 43:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); 44:                     endTimestamp = System.currentTimeMillis(); 45:                     // 更新Broker可用性信息 46:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 47:                     switch (communicationMode) { 48:                         case ASYNC: 49:                             return null; 50:                         case ONEWAY: 51:                             return null; 52:                         case SYNC: 53:                             if (sendResult.getSendStatus() != SendStatus.SEND_OK) { 54:                                 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试 55:                                     continue; 56:                                 } 57:                             } 58:                             return sendResult; 59:                         default: 60:                             break; 61:                     } 62:                 } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环 63:                     endTimestamp = System.currentTimeMillis(); 64:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 65:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 66:                     log.warn(msg.toString()); 67:                     exception = e; 68:                     continue; 69:                 } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环 70:                     endTimestamp = System.currentTimeMillis(); 71:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 72:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 73:                     log.warn(msg.toString()); 74:                     exception = e; 75:                     continue; 76:                 } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环 77:                     endTimestamp = System.currentTimeMillis(); 78:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 79:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 80:                     log.warn(msg.toString()); 81:                     exception = e; 82:                     switch (e.getResponseCode()) { 83:                         // 如下异常continue,进行发送消息重试 84:                         case ResponseCode.TOPIC_NOT_EXIST: 85:                         case ResponseCode.SERVICE_NOT_AVAILABLE: 86:                         case ResponseCode.SYSTEM_ERROR: 87:                         case ResponseCode.NO_PERMISSION: 88:                         case ResponseCode.NO_BUYER_ID: 89:                         case ResponseCode.NOT_IN_CURRENT_UNIT: 90:                             continue; 91:                         // 如果有发送结果,进行返回,否则,抛出异常; 92:                         default: 93:                             if (sendResult != null) { 94:                                 return sendResult; 95:                             } 96:                             throw e; 97:                     } 98:                 } catch (InterruptedException e) { 99:                     endTimestamp = System.currentTimeMillis();100:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);101:                     log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);102:                     log.warn(msg.toString());103:                     throw e;104:                 }105:             } else {106:                 break;107:             }108:         }109:         // 返回发送结果110:         if (sendResult != null) {111:             return sendResult;112:         }113:         // 根据不同情况,抛出不同的异常114:         String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,115:                 msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);116:         MQClientException mqClientException = new MQClientException(info, exception);117:         if (exception instanceof MQBrokerException) {118:             mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());119:         } else if (exception instanceof RemotingConnectException) {120:             mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);121:         } else if (exception instanceof RemotingTimeoutException) {122:             mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);123:         } else if (exception instanceof MQClientException) {124:             mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);125:         }126:         throw mqClientException;127:     }128:     // Namesrv找不到异常129:     List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();130:     if (null == nsList || nsList.isEmpty()) {131:         throw new MQClientException(132:             "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);133:     }134:     // 消息路由找不到异常135:     throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),136:         null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);137: }
  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
  • 第 1 至 7 行:对sendsendDefaultImpl(...)进行封装。
  • 第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。
  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
  • 第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy
  • 第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。
DefaultMQProducerImpl#tryToFindTopicPublishInfo() 1: private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { 2:     // 缓存中获取 Topic发布信息 3:     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); 4:     // 当无可用的 Topic发布信息时,从Namesrv获取一次 5:     if (null == topicPublishInfo || !topicPublishInfo.ok()) { 6:         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); 7:         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 8:         topicPublishInfo = this.topicPublishInfoTable.get(topic); 9:     }10:     // 若获取的 Topic发布信息时候可用,则返回11:     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {12:         return topicPublishInfo;13:     } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic14:         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);15:         topicPublishInfo = this.topicPublishInfoTable.get(topic);16:         return topicPublishInfo;17:     }18: }
  • 说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
  • 第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。
  • 第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。
  • 第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》
MQFaultStrategy
MQFaultStrategy 1: public class MQFaultStrategy { 2:     private final static Logger log = ClientLogger.getLog(); 3:  4:     /** 5:      * 延迟故障容错,维护每个Broker的发送消息的延迟 6:      * key:brokerName 7:      */ 8:     private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); 9:     /**10:      * 发送消息延迟容错开关11:      */12:     private boolean sendLatencyFaultEnable = false;13:     /**14:      * 延迟级别数组15:      */16:     private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};17:     /**18:      * 不可用时长数组19:      */20:     private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};21: 22:     /**23:      * 根据 Topic发布信息 选择一个消息队列24:      *25:      * @param tpInfo Topic发布信息26:      * @param lastBrokerName brokerName27:      * @return 消息队列28:      */29:     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {30:         if (this.sendLatencyFaultEnable) {31:             try {32:                 // 获取 brokerName=lastBrokerName && 可用的一个消息队列33:                 int index = tpInfo.getSendWhichQueue().getAndIncrement();34:                 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {35:                     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();36:                     if (pos < 0)37:                         pos = 0;38:                     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);39:                     if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {40:                         if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))41:                             return mq;42:                     }43:                 }44:                 // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性45:                 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();46:                 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);47:                 if (writeQueueNums > 0) {48:                     final MessageQueue mq = tpInfo.selectOneMessageQueue();49:                     if (notBestBroker != null) {50:                         mq.setBrokerName(notBestBroker);51:                         mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);52:                     }53:                     return mq;54:                 } else {55:                     latencyFaultTolerance.remove(notBestBroker);56:                 }57:             } catch (Exception e) {58:                 log.error("Error occurred when selecting message queue", e);59:             }60:             // 选择一个消息队列,不考虑队列的可用性61:             return tpInfo.selectOneMessageQueue();62:         }63:         // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性64:         return tpInfo.selectOneMessageQueue(lastBrokerName);65:     }66: 67:     /**68:      * 更新延迟容错信息69:      *70:      * @param brokerName brokerName71:      * @param currentLatency 延迟72:      * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时73:      */74:     public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {75:         if (this.sendLatencyFaultEnable) {76:             long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);77:             this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);78:         }79:     }80: 81:     /**82:      * 计算延迟对应的不可用时间83:      *84:      * @param currentLatency 延迟85:      * @return 不可用时间86:      */87:     private long computeNotAvailableDuration(final long currentLatency) {88:         for (int i = latencyMax.length - 1; i >= 0; i--) {89:             if (currentLatency >= latencyMax)90:                 return this.notAvailableDuration;91:         }92:         return 0;93:     }
  • 说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。
  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
  • 第 64 行 :未开启容错策略选择消息队列逻辑。
  • 第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMax,notAvailableDuration的配置,对应如下:
    | Producer发送消息消耗时长 | Broker不可用时长 |
    | — | — |
    | >= 15000 ms | 600 1000 ms |
    | >= 3000 ms | 180 1000 ms |
    | >= 2000 ms | 120 1000 ms |
    | >= 1000 ms | 60 1000 ms |
    | >= 550 ms | 30 * 1000 ms |
    | >= 100 ms | 0 ms |
    | >= 50 ms | 0 ms |

LatencyFaultTolerance 1: public interface LatencyFaultTolerance<T> { 2:  3:     /** 4:      * 更新对应的延迟和不可用时长 5:      * 6:      * @param name 对象 7:      * @param currentLatency 延迟 8:      * @param notAvailableDuration 不可用时长 9:      */10:     void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);11: 12:     /**13:      * 对象是否可用14:      *15:      * @param name 对象16:      * @return 是否可用17:      */18:     boolean isAvailable(final T name);19: 20:     /**21:      * 移除对象22:      *23:      * @param name 对象24:      */25:     void remove(final T name);26: 27:     /**28:      * 获取一个对象29:      *30:      * @return 对象31:      */32:     T pickOneAtLeast();33: }
  • 说明 :延迟故障容错接口
LatencyFaultToleranceImpl 1: public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { 2:  3:     /** 4:      * 对象故障信息Table 5:      */ 6:     private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16); 7:     /** 8:      * 对象选择Index 9:      * @see #pickOneAtLeast()10:      */11:     private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();12: 13:     @Override14:     public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {15:         FaultItem old = this.faultItemTable.get(name);16:         if (null == old) {17:             // 创建对象18:             final FaultItem faultItem = new FaultItem(name);19:             faultItem.setCurrentLatency(currentLatency);20:             faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);21:             // 更新对象22:             old = this.faultItemTable.putIfAbsent(name, faultItem);23:             if (old != null) {24:                 old.setCurrentLatency(currentLatency);25:                 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);26:             }27:         } else { // 更新对象28:             old.setCurrentLatency(currentLatency);29:             old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);30:         }31:     }32: 33:     @Override34:     public boolean isAvailable(final String name) {35:         final FaultItem faultItem = this.faultItemTable.get(name);36:         if (faultItem != null) {37:             return faultItem.isAvailable();38:         }39:         return true;40:     }41: 42:     @Override43:     public void remove(final String name) {44:         this.faultItemTable.remove(name);45:     }46: 47:     /**48:      * 选择一个相对优秀的对象49:      *50:      * @return 对象51:      */52:     @Override53:     public String pickOneAtLeast() {54:         // 创建数组55:         final Enumeration<FaultItem> elements = this.faultItemTable.elements();56:         List<FaultItem> tmpList = new LinkedList<>();57:         while (elements.hasMoreElements()) {58:             final FaultItem faultItem = elements.nextElement();59:             tmpList.add(faultItem);60:         }61:         //62:         if (!tmpList.isEmpty()) {63:             // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。64:             Collections.shuffle(tmpList);65:             Collections.sort(tmpList);66:             // 选择顺序在前一半的对象67:             final int half = tmpList.size() / 2;68:             if (half <= 0) {69:                 return tmpList.get(0).getName();70:             } else {71:                 final int i = this.whichItemWorst.getAndIncrement() % half;72:                 return tmpList.get(i).getName();73:             }74:         }75:         return null;76:     }77: }
  • 说明 :延迟故障容错实现。维护每个对象的信息。
FaultItem 1: class FaultItem implements Comparable<FaultItem> { 2:     /** 3:      * 对象名 4:      */ 5:     private final String name; 6:     /** 7:      * 延迟 8:      */ 9:     private volatile long currentLatency;10:     /**11:      * 开始可用时间12:      */13:     private volatile long startTimestamp;14: 15:     public FaultItem(final String name) {16:         this.name = name;17:     }18: 19:     /**20:      * 比较对象21:      * 可用性 > 延迟 > 开始可用时间22:      *23:      * @param other other24:      * @return 升序25:      */26:     @Override27:     public int compareTo(final FaultItem other) {28:         if (this.isAvailable() != other.isAvailable()) {29:             if (this.isAvailable())30:                 return -1;31: 32:             if (other.isAvailable())33:                 return 1;34:         }35: 36:         if (this.currentLatency < other.currentLatency)37:             return -1;38:         else if (this.currentLatency > other.currentLatency) {39:             return 1;40:         }41: 42:         if (this.startTimestamp < other.startTimestamp)43:             return -1;44:         else if (this.startTimestamp > other.startTimestamp) {45:             return 1;46:         }47: 48:         return 0;49:     }50: 51:     /**52:      * 是否可用:当开始可用时间大于当前时间53:      *54:      * @return 是否可用55:      */56:     public boolean isAvailable() {57:         return (System.currentTimeMillis() - startTimestamp) >= 0;58:     }59: 60:     @Override61:     public int hashCode() {62:         int result = getName() != null ? getName().hashCode() : 0;63:         result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));64:         result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));65:         return result;66:     }67: 68:     @Override69:     public boolean equals(final Object o) {70:         if (this == o)71:             return true;72:         if (!(o instanceof FaultItem))73:             return false;74: 75:         final FaultItem faultItem = (FaultItem) o;76: 77:         if (getCurrentLatency() != faultItem.getCurrentLatency())78:             return false;79:         if (getStartTimestamp() != faultItem.getStartTimestamp())80:             return false;81:         return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;82: 83:     }84: }
  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。
DefaultMQProducerImpl#sendKernelImpl()  1: private SendResult sendKernelImpl(final Message msg, //  2:     final MessageQueue mq, //  3:     final CommunicationMode communicationMode, //  4:     final SendCallback sendCallback, //  5:     final TopicPublishInfo topicPublishInfo, //  6:     final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  7:     // 获取 broker地址  8:     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());  9:     if (null == brokerAddr) { 10:         tryToFindTopicPublishInfo(mq.getTopic()); 11:         brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); 12:     } 13:     // 14:     SendMessageContext context = null; 15:     if (brokerAddr != null) { 16:         // 是否使用broker vip通道。broker会开启两个端口对外服务。 17:         brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); 18:         byte[] prevBody = msg.getBody(); // 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。 19:         try { 20:             // 设置唯一编号 21:             MessageClientIDSetter.setUniqID(msg); 22:             // 消息压缩 23:             int sysFlag = 0; 24:             if (this.tryToCompressMessage(msg)) { 25:                 sysFlag |= MessageSysFlag.COMPRESSED_FLAG; 26:             } 27:             // 事务 28:             final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); 29:             if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { 30:                 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; 31:             } 32:             // hook:发送消息校验 33:             if (hasCheckForbiddenHook()) { 34:                 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); 35:                 checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); 36:                 checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); 37:                 checkForbiddenContext.setCommunicationMode(communicationMode); 38:                 checkForbiddenContext.setBrokerAddr(brokerAddr); 39:                 checkForbiddenContext.setMessage(msg); 40:                 checkForbiddenContext.setMq(mq); 41:                 checkForbiddenContext.setUnitMode(this.isUnitMode()); 42:                 this.executeCheckForbiddenHook(checkForbiddenContext); 43:             } 44:             // hook:发送消息前逻辑 45:             if (this.hasSendMessageHook()) { 46:                 context = new SendMessageContext(); 47:                 context.setProducer(this); 48:                 context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); 49:                 context.setCommunicationMode(communicationMode); 50:                 context.setBornHost(this.defaultMQProducer.getClientIP()); 51:                 context.setBrokerAddr(brokerAddr); 52:                 context.setMessage(msg); 53:                 context.setMq(mq); 54:                 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); 55:                 if (isTrans != null && isTrans.equals("true")) { 56:                     context.setMsgType(MessageType.Trans_Msg_Half); 57:                 } 58:                 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { 59:                     context.setMsgType(MessageType.Delay_Msg); 60:                 } 61:                 this.executeSendMessageHookBefore(context); 62:             } 63:             // 构建发送消息请求 64:             SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); 65:             requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); 66:             requestHeader.setTopic(msg.getTopic()); 67:             requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); 68:             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); 69:             requestHeader.setQueueId(mq.getQueueId()); 70:             requestHeader.setSysFlag(sysFlag); 71:             requestHeader.setBornTimestamp(System.currentTimeMillis()); 72:             requestHeader.setFlag(msg.getFlag()); 73:             requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); 74:             requestHeader.setReconsumeTimes(0); 75:             requestHeader.setUnitMode(this.isUnitMode()); 76:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重发Topic 77:                 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); 78:                 if (reconsumeTimes != null) { 79:                     requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); 80:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); 81:                 } 82:                 String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); 83:                 if (maxReconsumeTimes != null) { 84:                     requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); 85:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); 86:                 } 87:             } 88:             // 发送消息 89:             SendResult sendResult = null; 90:             switch (communicationMode) { 91:                 case ASYNC: 92:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// 93:                         brokerAddr, // 1 94:                         mq.getBrokerName(), // 2 95:                         msg, // 3 96:                         requestHeader, // 4 97:                         timeout, // 5 98:                         communicationMode, // 6 99:                         sendCallback, // 7100:                         topicPublishInfo, // 8101:                         this.mQClientFactory, // 9102:                         this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10103:                         context, //104:                         this);105:                     break;106:                 case ONEWAY:107:                 case SYNC:108:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(109:                         brokerAddr,110:                         mq.getBrokerName(),111:                         msg,112:                         requestHeader,113:                         timeout,114:                         communicationMode,115:                         context,116:                         this);117:                     break;118:                 default:119:                     assert false;120:                     break;121:             }122:             // hook:发送消息后逻辑123:             if (this.hasSendMessageHook()) {124:                 context.setSendResult(sendResult);125:                 this.executeSendMessageHookAfter(context);126:             }127:             // 返回发送结果128:             return sendResult;129:         } catch (RemotingException e) {130:             if (this.hasSendMessageHook()) {131:                 context.setException(e);132:                 this.executeSendMessageHookAfter(context);133:             }134:             throw e;135:         } catch (MQBrokerException e) {136:             if (this.hasSendMessageHook()) {137:                 context.setException(e);138:                 this.executeSendMessageHookAfter(context);139:             }140:             throw e;141:         } catch (InterruptedException e) {142:             if (this.hasSendMessageHook()) {143:                 context.setException(e);144:                 this.executeSendMessageHookAfter(context);145:             }146:             throw e;147:         } finally {148:             msg.setBody(prevBody);149:         }150:     }151:     // broker为空抛出异常152:     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);153: }
  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker。
  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》
  • 第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader。
  • 第 107 至 117 行 :执行 MQClientInstance#sendMessage(...) 发起网络请求。
3、Broker 接收消息
SendMessageProcessor#sendMessage  1: @Override  2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {  3:     SendMessageContext mqtraceContext;  4:     switch (request.getCode()) {  5:         case RequestCode.CONSUMER_SEND_MSG_BACK:  6:             return this.consumerSendMsgBack(ctx, request);  7:         default:  8:             // 解析请求  9:             SendMessageRequestHeader requestHeader = parseRequestHeader(request); 10:             if (requestHeader == null) { 11:                 return null; 12:             } 13:             // 发送请求Context。在 hook 场景下使用 14:             mqtraceContext = buildMsgContext(ctx, requestHeader); 15:             // hook:处理发送消息前逻辑 16:             this.executeSendMessageHookBefore(ctx, request, mqtraceContext); 17:             // 处理发送消息逻辑 18:             final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); 19:             // hook:处理发送消息后逻辑 20:             this.executeSendMessageHookAfter(response, mqtraceContext); 21:             return response; 22:     } 23: } 24:  25: private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // 26:     final RemotingCommand request, // 27:     final SendMessageContext sendMessageContext, // 28:     final SendMessageRequestHeader requestHeader) throws RemotingCommandException { 29:  30:     // 初始化响应 31:     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); 32:     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); 33:     response.setOpaque(request.getOpaque()); 34:     response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); 35:     response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); 36:  37:     if (log.isDebugEnabled()) { 38:         log.debug("receive SendMessage request command, {}", request); 39:     } 40:  41:     // 如果未开始接收消息,抛出系统异常 42:     @SuppressWarnings("SpellCheckingInspection") 43:     final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); 44:     if (this.brokerController.getMessageStore().now() < startTimstamp) { 45:         response.setCode(ResponseCode.SYSTEM_ERROR); 46:         response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); 47:         return response; 48:     } 49:  50:     // 消息配置(Topic配置)校验 51:     response.setCode(-1); 52:     super.msgCheck(ctx, requestHeader, response); 53:     if (response.getCode() != -1) { 54:         return response; 55:     } 56:  57:     final byte[] body = request.getBody(); 58:  59:     // 如果队列小于0,从可用队列随机选择 60:     int queueIdInt = requestHeader.getQueueId(); 61:     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); 62:     if (queueIdInt < 0) { 63:         queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); 64:     } 65:  66:     // 67:     int sysFlag = requestHeader.getSysFlag(); 68:     if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { 69:         sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; 70:     } 71:  72:     // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue) 73:     String newTopic = requestHeader.getTopic(); 74:     if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 75:         // 获取订阅分组配置 76:         String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); 77:         SubscriptionGroupConfig subscriptionGroupConfig = 78:             this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); 79:         if (null == subscriptionGroupConfig) { 80:             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); 81:             response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); 82:             return response; 83:         } 84:         // 计算最大可消费次数 85:         int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 86:         if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 87:             maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 88:         } 89:         int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); 90:         if (reconsumeTimes >= maxReconsumeTimes) { // 超过最大消费次数 91:             newTopic = MixAll.getDLQTopic(groupName); 92:             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; 93:             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // 94:                 DLQ_NUMS_PER_GROUP, // 95:                 PermName.PERM_WRITE, 0 96:             ); 97:             if (null == topicConfig) { 98:                 response.setCode(ResponseCode.SYSTEM_ERROR); 99:                 response.setRemark("topic[" + newTopic + "] not exist");100:                 return response;101:             }102:         }103:     }104: 105:     // 创建MessageExtBrokerInner106:     MessageExtBrokerInner msgInner = new MessageExtBrokerInner();107:     msgInner.setTopic(newTopic);108:     msgInner.setBody(body);109:     msgInner.setFlag(requestHeader.getFlag());110:     MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));111:     msgInner.setPropertiesString(requestHeader.getProperties());112:     msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));113:     msgInner.setQueueId(queueIdInt);114:     msgInner.setSysFlag(sysFlag);115:     msgInner.setBornTimestamp(requestHeader.getBornTimestamp());116:     msgInner.setBornHost(ctx.channel().remoteAddress());117:     msgInner.setStoreHost(this.getStoreHost());118:     msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());119: 120:     // 校验是否不允许发送事务消息121:     if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {122:         String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);123:         if (traFlag != null) {124:             response.setCode(ResponseCode.NO_PERMISSION);125:             response.setRemark(126:                 "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");127:             return response;128:         }129:     }130: 131:     // 添加消息132:     PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);133:     if (putMessageResult != null) {134:         boolean sendOK = false;135: 136:         switch (putMessageResult.getPutMessageStatus()) {137:             // Success138:             case PUT_OK:139:                 sendOK = true;140:                 response.setCode(ResponseCode.SUCCESS);141:                 break;142:             case FLUSH_DISK_TIMEOUT:143:                 response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);144:                 sendOK = true;145:                 break;146:             case FLUSH_SLAVE_TIMEOUT:147:                 response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);148:                 sendOK = true;149:                 break;150:             case SLAVE_NOT_AVAILABLE:151:                 response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);152:                 sendOK = true;153:                 break;154: 155:             // Failed156:             case CREATE_MAPEDFILE_FAILED:157:                 response.setCode(ResponseCode.SYSTEM_ERROR);158:                 response.setRemark("create mapped file failed, server is busy or broken.");159:                 break;160:             case MESSAGE_ILLEGAL:161:             case PROPERTIES_SIZE_EXCEEDED:162:                 response.setCode(ResponseCode.MESSAGE_ILLEGAL);163:                 response.setRemark(164:                     "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");165:                 break;166:             case SERVICE_NOT_AVAILABLE:167:                 response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);168:                 response.setRemark(169:                     "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");170:                 break;171:             case OS_PAGECACHE_BUSY:172:                 response.setCode(ResponseCode.SYSTEM_ERROR);173:                 response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");174:                 break;175:             case UNKNOWN_ERROR:176:                 response.setCode(ResponseCode.SYSTEM_ERROR);177:                 response.setRemark("UNKNOWN_ERROR");178:                 break;179:             default:180:                 response.setCode(ResponseCode.SYSTEM_ERROR);181:                 response.setRemark("UNKNOWN_ERROR DEFAULT");182:                 break;183:         }184: 185:         String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);186:         if (sendOK) {187:             // 统计188:             this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());189:             this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());190:             this.brokerController.getBrokerStatsManager().incBrokerPutNums();191: 192:             // 响应193:             response.setRemark(null);194:             responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());195:             responseHeader.setQueueId(queueIdInt);196:             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());197:             doResponse(ctx, request, response);198: 199:             // hook:设置发送成功到context200:             if (hasSendMessageHook()) {201:                 sendMessageContext.setMsgId(responseHeader.getMsgId());202:                 sendMessageContext.setQueueId(responseHeader.getQueueId());203:                 sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());204: 205:                 int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();206:                 int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();207:                 int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;208: 209:                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);210:                 sendMessageContext.setCommercialSendTimes(incValue);211:                 sendMessageContext.setCommercialSendSize(wroteSize);212:                 sendMessageContext.setCommercialOwner(owner);213:             }214:             return null;215:         } else {216:             // hook:设置发送失败到context217:             if (hasSendMessageHook()) {218:                 int wroteSize = request.getBody().length;219:                 int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);220: 221:                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);222:                 sendMessageContext.setCommercialSendTimes(incValue);223:                 sendMessageContext.setCommercialSendSize(wroteSize);224:                 sendMessageContext.setCommercialOwner(owner);225:             }226:         }227:     } else {228:         response.setCode(ResponseCode.SYSTEM_ERROR);229:         response.setRemark("store putMessage return null");230:     }231: 232:     return response;233: }
  • #processRequest() 说明 :处理消息请求。
  • #sendMessage() 说明 :发送消息,并返回发送消息结果。
  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()
  • 第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。
  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成”%DLQ%” + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》
  • 第 105 至 118 行 :创建MessageExtBrokerInner。
  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()
  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
  • 第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。
AbstractSendMessageProcessor#msgCheck 1: protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, 2:                                    final SendMessageRequestHeader requestHeader, final RemotingCommand response) { 3:     // 检查 broker 是否有写入权限 4:     if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) 5:         && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { 6:         response.setCode(ResponseCode.NO_PERMISSION); 7:         response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() 8:             + "] sending message is forbidden"); 9:         return response;10:     }11:     // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送12:     if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {13:         String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";14:         log.warn(errorMsg);15:         response.setCode(ResponseCode.SYSTEM_ERROR);16:         response.setRemark(errorMsg);17:         return response;18:     }19:     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());20:     if (null == topicConfig) { // 不能存在topicConfig,则进行创建21:         int topicSysFlag = 0;22:         if (requestHeader.isUnitMode()) {23:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {24:                 topicSysFlag = TopicSysFlag.buildSysFlag(false, true);25:             } else {26:                 topicSysFlag = TopicSysFlag.buildSysFlag(true, false);27:             }28:         }29:         // 创建topic配置30:         log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());31:         topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//32:             requestHeader.getTopic(), //33:             requestHeader.getDefaultTopic(), //34:             RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //35:             requestHeader.getDefaultTopicQueueNums(), topicSysFlag);36:         if (null == topicConfig) {37:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {38:                 topicConfig =39:                     this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(40:                         requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,41:                         topicSysFlag);42:             }43:         }44:         // 如果没配置45:         if (null == topicConfig) {46:             response.setCode(ResponseCode.TOPIC_NOT_EXIST);47:             response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"48:                 + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));49:             return response;50:         }51:     }52:     // 队列编号是否正确53:     int queueIdInt = requestHeader.getQueueId();54:     int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());55:     if (queueIdInt >= idValid) {56:         String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",57:             queueIdInt,58:             topicConfig.toString(),59:             RemotingHelper.parseChannelRemoteAddr(ctx.channel()));60:         log.warn(errorInfo);61:         response.setCode(ResponseCode.SYSTEM_ERROR);62:         response.setRemark(errorInfo);63:         return response;64:     }65:     return response;66: }
  • 说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允许发送。
  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》
DefaultMessageStore#putMessage 1: public PutMessageResult putMessage(MessageExtBrokerInner msg) { 2:     if (this.shutdown) { 3:         log.warn("message store has shutdown, so putMessage is forbidden"); 4:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 5:     } 6:  7:     // 从节点不允许写入 8:     if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { 9:         long value = this.printTimes.getAndIncrement();10:         if ((value % 50000) == 0) {11:             log.warn("message store is slave mode, so putMessage is forbidden ");12:         }13: 14:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);15:     }16: 17:     // store是否允许写入18:     if (!this.runningFlags.isWriteable()) {19:         long value = this.printTimes.getAndIncrement();20:         if ((value % 50000) == 0) {21:             log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());22:         }23: 24:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);25:     } else {26:         this.printTimes.set(0);27:     }28: 29:     // 消息过长30:     if (msg.getTopic().length() > Byte.MAX_VALUE) {31:         log.warn("putMessage message topic length too long " + msg.getTopic().length());32:         return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);33:     }34: 35:     // 消息附加属性过长36:     if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {37:         log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());38:         return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);39:     }40: 41:     if (this.isOSPageCacheBusy()) {42:         return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);43:     }44: 45:     long beginTime = this.getSystemClock().now();46:     // 添加消息到commitLog47:     PutMessageResult result = this.commitLog.putMessage(msg);48: 49:     long eclipseTime = this.getSystemClock().now() - beginTime;50:     if (eclipseTime > 500) {51:         log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);52:     }53:     this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);54: 55:     if (null == result || !result.isOk()) {56:         this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();57:     }58: 59:     return result;60: }
  • 说明:存储消息封装,最终存储需要 CommitLog 实现。
  • 第 7 至 27 行 :校验 Broker 是否可以写入。
  • 第 29 至 39 行 :消息格式与大小校验。
  • 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》


【转载】

3 个回复

倒序浏览
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马