2、Producer 发送消息
DefaultMQProducer#send(Message)1: @Override2: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {3: return this.defaultMQProducerImpl.send(msg);4: }
MQFaultStrategy 1: public class MQFaultStrategy { 2: private final static Logger log = ClientLogger.getLog(); 3: 4: /** 5: * 延迟故障容错,维护每个Broker的发送消息的延迟 6: * key:brokerName 7: */ 8: private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); 9: /**10: * 发送消息延迟容错开关11: */12: private boolean sendLatencyFaultEnable = false;13: /**14: * 延迟级别数组15: */16: private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};17: /**18: * 不可用时长数组19: */20: private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};21: 22: /**23: * 根据 Topic发布信息 选择一个消息队列24: *25: * @param tpInfo Topic发布信息26: * @param lastBrokerName brokerName27: * @return 消息队列28: */29: public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {30: if (this.sendLatencyFaultEnable) {31: try {32: // 获取 brokerName=lastBrokerName && 可用的一个消息队列33: int index = tpInfo.getSendWhichQueue().getAndIncrement();34: for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {35: int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();36: if (pos < 0)37: pos = 0;38: MessageQueue mq = tpInfo.getMessageQueueList().get(pos);39: if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {40: if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))41: return mq;42: }43: }44: // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性45: final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();46: int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);47: if (writeQueueNums > 0) {48: final MessageQueue mq = tpInfo.selectOneMessageQueue();49: if (notBestBroker != null) {50: mq.setBrokerName(notBestBroker);51: mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);52: }53: return mq;54: } else {55: latencyFaultTolerance.remove(notBestBroker);56: }57: } catch (Exception e) {58: log.error("Error occurred when selecting message queue", e);59: }60: // 选择一个消息队列,不考虑队列的可用性61: return tpInfo.selectOneMessageQueue();62: }63: // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性64: return tpInfo.selectOneMessageQueue(lastBrokerName);65: }66: 67: /**68: * 更新延迟容错信息69: *70: * @param brokerName brokerName71: * @param currentLatency 延迟72: * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时73: */74: public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {75: if (this.sendLatencyFaultEnable) {76: long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);77: this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);78: }79: }80: 81: /**82: * 计算延迟对应的不可用时间83: *84: * @param currentLatency 延迟85: * @return 不可用时间86: */87: private long computeNotAvailableDuration(final long currentLatency) {88: for (int i = latencyMax.length - 1; i >= 0; i--) {89: if (currentLatency >= latencyMax)90: return this.notAvailableDuration;91: }92: return 0;93: }
SendMessageProcessor#sendMessage 1: @Override 2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { 3: SendMessageContext mqtraceContext; 4: switch (request.getCode()) { 5: case RequestCode.CONSUMER_SEND_MSG_BACK: 6: return this.consumerSendMsgBack(ctx, request); 7: default: 8: // 解析请求 9: SendMessageRequestHeader requestHeader = parseRequestHeader(request); 10: if (requestHeader == null) { 11: return null; 12: } 13: // 发送请求Context。在 hook 场景下使用 14: mqtraceContext = buildMsgContext(ctx, requestHeader); 15: // hook:处理发送消息前逻辑 16: this.executeSendMessageHookBefore(ctx, request, mqtraceContext); 17: // 处理发送消息逻辑 18: final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); 19: // hook:处理发送消息后逻辑 20: this.executeSendMessageHookAfter(response, mqtraceContext); 21: return response; 22: } 23: } 24: 25: private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // 26: final RemotingCommand request, // 27: final SendMessageContext sendMessageContext, // 28: final SendMessageRequestHeader requestHeader) throws RemotingCommandException { 29: 30: // 初始化响应 31: final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); 32: final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); 33: response.setOpaque(request.getOpaque()); 34: response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); 35: response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); 36: 37: if (log.isDebugEnabled()) { 38: log.debug("receive SendMessage request command, {}", request); 39: } 40: 41: // 如果未开始接收消息,抛出系统异常 42: @SuppressWarnings("SpellCheckingInspection") 43: final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); 44: if (this.brokerController.getMessageStore().now() < startTimstamp) { 45: response.setCode(ResponseCode.SYSTEM_ERROR); 46: response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); 47: return response; 48: } 49: 50: // 消息配置(Topic配置)校验 51: response.setCode(-1); 52: super.msgCheck(ctx, requestHeader, response); 53: if (response.getCode() != -1) { 54: return response; 55: } 56: 57: final byte[] body = request.getBody(); 58: 59: // 如果队列小于0,从可用队列随机选择 60: int queueIdInt = requestHeader.getQueueId(); 61: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); 62: if (queueIdInt < 0) { 63: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); 64: } 65: 66: // 67: int sysFlag = requestHeader.getSysFlag(); 68: if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { 69: sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; 70: } 71: 72: // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue) 73: String newTopic = requestHeader.getTopic(); 74: if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 75: // 获取订阅分组配置 76: String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); 77: SubscriptionGroupConfig subscriptionGroupConfig = 78: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); 79: if (null == subscriptionGroupConfig) { 80: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); 81: response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); 82: return response; 83: } 84: // 计算最大可消费次数 85: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 86: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 87: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 88: } 89: int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); 90: if (reconsumeTimes >= maxReconsumeTimes) { // 超过最大消费次数 91: newTopic = MixAll.getDLQTopic(groupName); 92: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; 93: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // 94: DLQ_NUMS_PER_GROUP, // 95: PermName.PERM_WRITE, 0 96: ); 97: if (null == topicConfig) { 98: response.setCode(ResponseCode.SYSTEM_ERROR); 99: response.setRemark("topic[" + newTopic + "] not exist");100: return response;101: }102: }103: }104: 105: // 创建MessageExtBrokerInner106: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();107: msgInner.setTopic(newTopic);108: msgInner.setBody(body);109: msgInner.setFlag(requestHeader.getFlag());110: MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));111: msgInner.setPropertiesString(requestHeader.getProperties());112: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));113: msgInner.setQueueId(queueIdInt);114: msgInner.setSysFlag(sysFlag);115: msgInner.setBornTimestamp(requestHeader.getBornTimestamp());116: msgInner.setBornHost(ctx.channel().remoteAddress());117: msgInner.setStoreHost(this.getStoreHost());118: msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());119: 120: // 校验是否不允许发送事务消息121: if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {122: String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);123: if (traFlag != null) {124: response.setCode(ResponseCode.NO_PERMISSION);125: response.setRemark(126: "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");127: return response;128: }129: }130: 131: // 添加消息132: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);133: if (putMessageResult != null) {134: boolean sendOK = false;135: 136: switch (putMessageResult.getPutMessageStatus()) {137: // Success138: case PUT_OK:139: sendOK = true;140: response.setCode(ResponseCode.SUCCESS);141: break;142: case FLUSH_DISK_TIMEOUT:143: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);144: sendOK = true;145: break;146: case FLUSH_SLAVE_TIMEOUT:147: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);148: sendOK = true;149: break;150: case SLAVE_NOT_AVAILABLE:151: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);152: sendOK = true;153: break;154: 155: // Failed156: case CREATE_MAPEDFILE_FAILED:157: response.setCode(ResponseCode.SYSTEM_ERROR);158: response.setRemark("create mapped file failed, server is busy or broken.");159: break;160: case MESSAGE_ILLEGAL:161: case PROPERTIES_SIZE_EXCEEDED:162: response.setCode(ResponseCode.MESSAGE_ILLEGAL);163: response.setRemark(164: "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");165: break;166: case SERVICE_NOT_AVAILABLE:167: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);168: response.setRemark(169: "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");170: break;171: case OS_PAGECACHE_BUSY:172: response.setCode(ResponseCode.SYSTEM_ERROR);173: response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");174: break;175: case UNKNOWN_ERROR:176: response.setCode(ResponseCode.SYSTEM_ERROR);177: response.setRemark("UNKNOWN_ERROR");178: break;179: default:180: response.setCode(ResponseCode.SYSTEM_ERROR);181: response.setRemark("UNKNOWN_ERROR DEFAULT");182: break;183: }184: 185: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);186: if (sendOK) {187: // 统计188: this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());189: this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());190: this.brokerController.getBrokerStatsManager().incBrokerPutNums();191: 192: // 响应193: response.setRemark(null);194: responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());195: responseHeader.setQueueId(queueIdInt);196: responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());197: doResponse(ctx, request, response);198: 199: // hook:设置发送成功到context200: if (hasSendMessageHook()) {201: sendMessageContext.setMsgId(responseHeader.getMsgId());202: sendMessageContext.setQueueId(responseHeader.getQueueId());203: sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());204: 205: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();206: int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();207: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;208: 209: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);210: sendMessageContext.setCommercialSendTimes(incValue);211: sendMessageContext.setCommercialSendSize(wroteSize);212: sendMessageContext.setCommercialOwner(owner);213: }214: return null;215: } else {216: // hook:设置发送失败到context217: if (hasSendMessageHook()) {218: int wroteSize = request.getBody().length;219: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);220: 221: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);222: sendMessageContext.setCommercialSendTimes(incValue);223: sendMessageContext.setCommercialSendSize(wroteSize);224: sendMessageContext.setCommercialOwner(owner);225: }226: }227: } else {228: response.setCode(ResponseCode.SYSTEM_ERROR);229: response.setRemark("store putMessage return null");230: }231: 232: return response;233: }
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |