A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

1. 概述
老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》
本文接 《Apollo 源码解析 —— Admin Service 发送 ReleaseMessage》 一文,分享配置发布的第四步,NotificationControllerV2 得到配置发布的 AppId+Cluster+Namespace 后,会通知对应的客户端 。
  • 客户端会发起一个Http 请求到 Config Service 的 notifications/v2 接口,也就是NotificationControllerV2 ,参见 RemoteConfigLongPollService 。
  • NotificationControllerV2 不会立即返回结果,而是通过 Spring DeferredResult 把请求挂起。
  • 如果在 60 秒内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端。
  • 如果有该客户端关心的配置发布,NotificationControllerV2 会调用 DeferredResult 的 setResult 方法,传入有配置变化的 namespace 信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的 namespace 后,会立即请求 Config Service 获取该 namespace 的最新配置。
友情提示:在目前 Apollo 的实现里,如下的名词是“等价”的:
  • 通知编号 = ReleaseMessage.id
  • Watch Key = ReleaseMessage.message
文章暂时未统一用词,所以胖友看的时候需要“脑补”下。
2. NotificationControllerV2
老艿艿:流程较长,代码较多,请耐心理解。
com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2 ,实现 ReleaseMessageListener 接口,通知 Controller ,仅提供 notifications/v2 接口。
2.1 构造方法
/** * Watch Key 与 DeferredResultWrapper 的 Multimap * * Key:Watch Key * Value:DeferredResultWrapper 数组 */private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());private static final Splitter STRING_SPLITTER = Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();private static final Type notificationsTypeReference = new TypeToken<List<ApolloConfigNotification>>() {}.getType();/** * 通过 ReleaseMessage 的消息内容,获得对应 Namespace 的名字 */private static final Function<String, String> retrieveNamespaceFromReleaseMessage =        releaseMessage -> {            if (Strings.isNullOrEmpty(releaseMessage)) {                return null;            }            List<String> keys = STRING_SPLITTER.splitToList(releaseMessage);            //message should be appId+cluster+namespace            if (keys.size() != 3) {                logger.error("message format invalid - {}", releaseMessage);                return null;            }            return keys.get(2);        };/** * 大量通知分批执行 ExecutorService */private final ExecutorService largeNotificationBatchExecutorService;@Autowiredprivate WatchKeysUtil watchKeysUtil;@Autowiredprivate ReleaseMessageServiceWithCache releaseMessageService;@Autowiredprivate EntityManagerUtil entityManagerUtil;@Autowiredprivate NamespaceUtil namespaceUtil;@Autowiredprivate Gson gson;@Autowiredprivate BizConfig bizConfig;public NotificationControllerV2() {    largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("NotificationControllerV2", true));}
  • deferredResults 属性,Watch Key 与 DeferredResultWrapper 的 Multimap 。
    • 在下文中,我们会看到大量的 Watch Key 。实际上,目前 Apollo 的实现上,Watch Key 等价于 ReleaseMessage 的通知内容 message 字段。
    • Multimap 指的是 Google Guava Multimap ,不熟悉的胖友可以看看 《Guava 学习笔记:Guava 新增集合类型 - Multimap》 。推荐在项目中使用。
    • 在 notifications/v2 中,当请求的 Namespace 暂无新通知时,会将该 Namespace 对应的 Watch Key 们,注册到 deferredResults 中。等到 Namespace 配置发生变更时,在 #handleMessage(...) 中,进行通知。
  • 其他属性,下文使用到,胖友可以回过头看看代码 + 注释。
2.2 pollNotification
1: @RequestMapping(method = RequestMethod.GET) 2: public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification( 3:         @RequestParam(value = "appId") String appId, 4:         @RequestParam(value = "cluster") String cluster, 5:         @RequestParam(value = "notifications") String notificationsAsString, 6:         @RequestParam(value = "dataCenter", required = false) String dataCenter, 7:         @RequestParam(value = "ip", required = false) String clientIp) { 8:     // 解析 notificationsAsString 参数,创建 ApolloConfigNotification 数组。 9:     List<ApolloConfigNotification> notifications = null;10:     try {11:         notifications = gson.fromJson(notificationsAsString, notificationsTypeReference);12:     } catch (Throwable ex) {13:         Tracer.logError(ex);14:     }15:     if (CollectionUtils.isEmpty(notifications)) {16:         throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);17:     }18:19:     // 创建 DeferredResultWrapper 对象20:     DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();21:     // Namespace 集合22:     Set<String> namespaces = Sets.newHashSet();23:     // 客户端的通知 Map 。key 为 Namespace 名,value 为通知编号。24:     Map<String, Long> clientSideNotifications = Maps.newHashMap();25:     // 过滤并创建 ApolloConfigNotification Map26:     Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);27:     // 循环 ApolloConfigNotification Map ,初始化上述变量。28:     for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {29:         String normalizedNamespace = notificationEntry.getKey();30:         ApolloConfigNotification notification = notificationEntry.getValue();31:         // 添加到 `namespaces` 中。32:         namespaces.add(normalizedNamespace);33:         // 添加到 `clientSideNotifications` 中。34:         clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());35:         // 记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。36:         if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {37:             deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);38:         }39:     }40:     if (CollectionUtils.isEmpty(namespaces)) {41:         throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);42:     }43:44:     // 组装 Watch Key Multimap45:     Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);46:     // 生成 Watch Key 集合47:     Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());48:     // 获得 Watch Key 集合中,每个 Watch Key 对应的 ReleaseMessage 记录。49:     List<ReleaseMessage> latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);50:51:     /**52:      * Manually close the entity manager.53:      * Since for async request, Spring won't do so until the request is finished,54:      * which is unacceptable since we are doing long polling - means the db connection would be hold55:      * for a very long time56:      */57:     // 手动关闭 EntityManager58:     // 因为对于 async 请求,Spring 在请求完成之前不会这样做59:     // 这是不可接受的,因为我们正在做长轮询——意味着 db 连接将被保留很长时间。60:     // 实际上,下面的过程,我们已经不需要 db 连接,因此进行关闭。61:     entityManagerUtil.closeEntityManager();62:     // 获得新的 ApolloConfigNotification 通知数组63:     List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);64:     // 若有新的通知,直接设置结果。65:     if (!CollectionUtils.isEmpty(newNotifications)) {66:         deferredResultWrapper.setResult(newNotifications);67:         // 若无新的通知,68:     } else {69:         // 注册超时事件70:         deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); // 【TODO 6001】Tracer 日志71:         // 注册结束事件72:         deferredResultWrapper.onCompletion(() -> {73:             // 移除 Watch Key + DeferredResultWrapper 出 `deferredResults`74:             // unregister all keys75:             for (String key : watchedKeys) {76:                 deferredResults.remove(key, deferredResultWrapper);77:             }78:             // 【TODO 6001】Tracer 日志79:             logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");80:         });81:82:         // 注册 Watch Key + DeferredResultWrapper 到 `deferredResults` 中,等待配置发生变化后通知。详见 `#handleMessage(...)` 方法。83:         // register all keys84:         for (String key : watchedKeys) {85:             this.deferredResults.put(key, deferredResultWrapper);86:         }87:88:         // 【TODO 6001】Tracer 日志89:         logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");90:         logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", watchedKeys, appId, cluster, namespaces, dataCenter);91:     }92:93:     return deferredResultWrapper.getResult();94: }
  • GET /notifications/v2 接口,具体 URL 在类上注明。
  • notificationsAsString 请求参数,JSON 字符串,在【第 8 至 17 行】的代码,解析成List<ApolloConfigNotification> ,表示客户端本地的配置通知信息。
    • 因为一个客户端可以订阅多个 Namespace ,所以该参数是 List 。关于 ApolloConfigNotification 类,胖友先跳到 「3. ApolloConfigNotification」 看完在回来。
    • 我们可以注意到,该接口真正返回的结果也是 List<ApolloConfigNotification> ,仅返回配置发生变化的 Namespace 对应的 ApolloConfigNotification 。也就说,当有几个 配置发生变化的 Namespace ,返回几个对应的 ApolloConfigNotification 。另外,客户端接收到返回后,会增量合并到本地的配置通知信息。客户端下次请求时,使用合并后的配置通知信息。
    • 注意,客户端请求时,只传递 ApolloConfigNotification 的 namespaceName +notificationId ,不传递 messages 。
  • clientIp 请求参数,目前该接口暂时用不到,作为预留参数。

3 个回复

倒序浏览
~(。≧3≦)ノ⌒☆
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马