黑马程序员技术交流社区

标题: 【上海校区】分布式消息队列 RocketMQ 源码分析 —— Message ... [打印本页]

作者: 不二晨    时间: 2018-8-1 09:55
标题: 【上海校区】分布式消息队列 RocketMQ 源码分析 —— Message ...
1、概述
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》
主要解析 Consumer 在 消费 逻辑涉及到的源码。
2、Consumer
MQ 提供了两类消费者:
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
3、PushConsumer 一览
先看一张 PushConsumer 包含的组件以及组件之间的交互图:
4、PushConsumer 订阅DefaultMQPushConsumerImpl#subscribe(…) 1: public void subscribe(String topic, String subExpression) throws MQClientException { 2:     try { 3:         // 创建订阅数据 4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5:             topic, subExpression); 6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7:         // 通过心跳同步Consumer信息到Broker 8:         if (this.mQClientFactory != null) { 9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();10:         }11:     } catch (Exception e) {12:         throw new MQClientException("subscription exception", e);13:     }14: }FilterAPI.buildSubscriptionData(…) 1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2:     String subString) throws Exception { 3:     SubscriptionData subscriptionData = new SubscriptionData(); 4:     subscriptionData.setTopic(topic); 5:     subscriptionData.setSubString(subString); 6:     // 处理订阅表达式 7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { 8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9:     } else {10:         String[] tags = subString.split("\\|\\|");11:         if (tags.length > 0) {12:             for (String tag : tags) {13:                 if (tag.length() > 0) {14:                     String trimString = tag.trim();15:                     if (trimString.length() > 0) {16:                         subscriptionData.getTagsSet().add(trimString);17:                         subscriptionData.getCodeSet().add(trimString.hashCode());18:                     }19:                 }20:             }21:         } else {22:             throw new Exception("subString split error");23:         }24:     }25: 26:     return subscriptionData;27: }DefaultMQPushConsumer#registerMessageListener(…)1: public void registerMessageListener(MessageListenerConcurrently messageListener) {2:     this.messageListener = messageListener;3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);4: }5、PushConsumer 消息队列分配
RebalanceService 1: public class RebalanceService extends ServiceThread { 2:  3:     /** 4:      * 等待间隔,单位:毫秒 5:      */ 6:     private static long waitInterval = 7:         Long.parseLong(System.getProperty( 8:             "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10:     private final Logger log = ClientLogger.getLog();11:     /**12:      * MQClient对象13:      */14:     private final MQClientInstance mqClientFactory;15: 16:     public RebalanceService(MQClientInstance mqClientFactory) {17:         this.mqClientFactory = mqClientFactory;18:     }19: 20:     @Override21:     public void run() {22:         log.info(this.getServiceName() + " service started");23: 24:         while (!this.isStopped()) {25:             this.waitForRunning(waitInterval);26:             this.mqClientFactory.doRebalance();27:         }28: 29:         log.info(this.getServiceName() + " service end");30:     }31: 32:     @Override33:     public String getServiceName() {34:         return RebalanceService.class.getSimpleName();35:     }36: }MQClientInstance#doRebalance(…) 1: public void doRebalance() { 2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3:         MQConsumerInner impl = entry.getValue(); 4:         if (impl != null) { 5:             try { 6:                 impl.doRebalance(); 7:             } catch (Throwable e) { 8:                 log.error("doRebalance exception", e); 9:             }10:         }11:     }12: }