A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

1、概述
主要解析 Consumer 在 消费 逻辑涉及到的源码。
2、Consumer
MQ 提供了两类消费者:
  • PushConsumer:
    • 在大多数场景下使用。
    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
  • PullConsumer
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
3、PushConsumer 一览
先看一张 PushConsumer 包含的组件以及组件之间的交互图:
  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列(MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
  • PullMessageService:拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到ConsumeMessageService。
  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
  • RemoteBrokerOffsetStore:Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker。
  • ProcessQueue :消息处理队列。
  • MQClientInstance :封装对 Namesrv,Broker 的 API调用,提供给 Producer、Consumer使用。
4、PushConsumer 订阅DefaultMQPushConsumerImpl#subscribe(…) 1: public void subscribe(String topic, String subExpression) throws MQClientException { 2:     try { 3:         // 创建订阅数据 4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5:             topic, subExpression); 6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7:         // 通过心跳同步Consumer信息到Broker 8:         if (this.mQClientFactory != null) { 9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();10:         }11:     } catch (Exception e) {12:         throw new MQClientException("subscription exception", e);13:     }14: }
  • 说明 :订阅 Topic 。
  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(…)
  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker。
FilterAPI.buildSubscriptionData(…) 1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2:     String subString) throws Exception { 3:     SubscriptionData subscriptionData = new SubscriptionData(); 4:     subscriptionData.setTopic(topic); 5:     subscriptionData.setSubString(subString); 6:     // 处理订阅表达式 7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { 8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9:     } else {10:         String[] tags = subString.split("\\|\\|");11:         if (tags.length > 0) {12:             for (String tag : tags) {13:                 if (tag.length() > 0) {14:                     String trimString = tag.trim();15:                     if (trimString.length() > 0) {16:                         subscriptionData.getTagsSet().add(trimString);17:                         subscriptionData.getCodeSet().add(trimString.hashCode());18:                     }19:                 }20:             }21:         } else {22:             throw new Exception("subString split error");23:         }24:     }25: 26:     return subscriptionData;27: }
  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据
  • subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(…)1: public void registerMessageListener(MessageListenerConcurrently messageListener) {2:     this.messageListener = messageListener;3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);4: }
  • 说明 :注册消息监听器。
5、PushConsumer 消息队列分配
RebalanceService 1: public class RebalanceService extends ServiceThread { 2:  3:     /** 4:      * 等待间隔,单位:毫秒 5:      */ 6:     private static long waitInterval = 7:         Long.parseLong(System.getProperty( 8:             "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10:     private final Logger log = ClientLogger.getLog();11:     /**12:      * MQClient对象13:      */14:     private final MQClientInstance mqClientFactory;15: 16:     public RebalanceService(MQClientInstance mqClientFactory) {17:         this.mqClientFactory = mqClientFactory;18:     }19: 20:     @Override21:     public void run() {22:         log.info(this.getServiceName() + " service started");23: 24:         while (!this.isStopped()) {25:             this.waitForRunning(waitInterval);26:             this.mqClientFactory.doRebalance();27:         }28: 29:         log.info(this.getServiceName() + " service end");30:     }31: 32:     @Override33:     public String getServiceName() {34:         return RebalanceService.class.getSimpleName();35:     }36: }
  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。
  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发:
    • 如 第 25 行 等待超时,每 20s 调用一次。
    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。
    • Broker 通知 Consumer 加入 或 移除时,Consumer 响应通知,调用rebalanceService#wakeup(...) 触发。

MQClientInstance#doRebalance(…) 1: public void doRebalance() { 2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3:         MQConsumerInner impl = entry.getValue(); 4:         if (impl != null) { 5:             try { 6:                 impl.doRebalance(); 7:             } catch (Throwable e) { 8:                 log.error("doRebalance exception", e); 9:             }10:         }11:     }12: }
  • 说明 :遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
  • 疑问:目前代码调试下来,consumerTable 只包含 Consumer 自己。

1 个回复

倒序浏览

奈斯,很赞
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马