1、概述 主要解析 Consumer 在 消费 逻辑涉及到的源码。 2、ConsumerMQ 提供了两类消费者: - PushConsumer:
- 在大多数场景下使用。
- 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )。
- PullConsumer
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。 3、PushConsumer 一览先看一张 PushConsumer 包含的组件以及组件之间的交互图: - RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列(MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
- PullMessageService:拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到ConsumeMessageService。
- ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
- RemoteBrokerOffsetStore:Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker。
- ProcessQueue :消息处理队列。
- MQClientInstance :封装对 Namesrv,Broker 的 API调用,提供给 Producer、Consumer使用。
4、PushConsumer 订阅DefaultMQPushConsumerImpl#subscribe(…) 1: public void subscribe(String topic, String subExpression) throws MQClientException { 2: try { 3: // 创建订阅数据 4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5: topic, subExpression); 6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7: // 通过心跳同步Consumer信息到Broker 8: if (this.mQClientFactory != null) { 9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();10: }11: } catch (Exception e) {12: throw new MQClientException("subscription exception", e);13: }14: }FilterAPI.buildSubscriptionData(…) 1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception { 3: SubscriptionData subscriptionData = new SubscriptionData(); 4: subscriptionData.setTopic(topic); 5: subscriptionData.setSubString(subString); 6: // 处理订阅表达式 7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { 8: subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9: } else {10: String[] tags = subString.split("\\|\\|");11: if (tags.length > 0) {12: for (String tag : tags) {13: if (tag.length() > 0) {14: String trimString = tag.trim();15: if (trimString.length() > 0) {16: subscriptionData.getTagsSet().add(trimString);17: subscriptionData.getCodeSet().add(trimString.hashCode());18: }19: }20: }21: } else {22: throw new Exception("subString split error");23: }24: }25: 26: return subscriptionData;27: }- 说明 :根据 Topic 和 订阅表达式 创建订阅数据
- subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(…)1: public void registerMessageListener(MessageListenerConcurrently messageListener) {2: this.messageListener = messageListener;3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);4: }5、PushConsumer 消息队列分配RebalanceService 1: public class RebalanceService extends ServiceThread { 2: 3: /** 4: * 等待间隔,单位:毫秒 5: */ 6: private static long waitInterval = 7: Long.parseLong(System.getProperty( 8: "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10: private final Logger log = ClientLogger.getLog();11: /**12: * MQClient对象13: */14: private final MQClientInstance mqClientFactory;15: 16: public RebalanceService(MQClientInstance mqClientFactory) {17: this.mqClientFactory = mqClientFactory;18: }19: 20: @Override21: public void run() {22: log.info(this.getServiceName() + " service started");23: 24: while (!this.isStopped()) {25: this.waitForRunning(waitInterval);26: this.mqClientFactory.doRebalance();27: }28: 29: log.info(this.getServiceName() + " service end");30: }31: 32: @Override33: public String getServiceName() {34: return RebalanceService.class.getSimpleName();35: }36: }MQClientInstance#doRebalance(…) 1: public void doRebalance() { 2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3: MQConsumerInner impl = entry.getValue(); 4: if (impl != null) { 5: try { 6: impl.doRebalance(); 7: } catch (Throwable e) { 8: log.error("doRebalance exception", e); 9: }10: }11: }12: }- 说明 :遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
- 疑问:目前代码调试下来,consumerTable 只包含 Consumer 自己。
|