1. 概述- 客户端会发起一个Http 请求到 Config Service 的 notifications/v2 接口,也就是NotificationControllerV2 ,参见 RemoteConfigLongPollService 。
- NotificationControllerV2 不会立即返回结果,而是通过 Spring DeferredResult 把请求挂起。
- 如果在 60 秒内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端。
- 如果有该客户端关心的配置发布,NotificationControllerV2 会调用 DeferredResult 的 setResult 方法,传入有配置变化的 namespace 信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的 namespace 后,会立即请求 Config Service 获取该 namespace 的最新配置。
友情提示:在目前 Apollo 的实现里,如下的名词是“等价”的: - 通知编号 = ReleaseMessage.id
- Watch Key = ReleaseMessage.message
文章暂时未统一用词,所以胖友看的时候需要“脑补”下。 2. NotificationControllerV2老艿艿:流程较长,代码较多,请耐心理解。
com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2 ,实现 ReleaseMessageListener 接口,通知 Controller ,仅提供 notifications/v2 接口。 2.1 构造方法
/** * Watch Key 与 DeferredResultWrapper 的 Multimap * * Key:Watch Key * Value:DeferredResultWrapper 数组 */private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());private static final Splitter STRING_SPLITTER = Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();private static final Type notificationsTypeReference = new TypeToken<List<ApolloConfigNotification>>() {}.getType();/** * 通过 ReleaseMessage 的消息内容,获得对应 Namespace 的名字 */private static final Function<String, String> retrieveNamespaceFromReleaseMessage = releaseMessage -> { if (Strings.isNullOrEmpty(releaseMessage)) { return null; } List<String> keys = STRING_SPLITTER.splitToList(releaseMessage); //message should be appId+cluster+namespace if (keys.size() != 3) { logger.error("message format invalid - {}", releaseMessage); return null; } return keys.get(2); };/** * 大量通知分批执行 ExecutorService */private final ExecutorService largeNotificationBatchExecutorService;@Autowiredprivate WatchKeysUtil watchKeysUtil;@Autowiredprivate ReleaseMessageServiceWithCache releaseMessageService;@Autowiredprivate EntityManagerUtil entityManagerUtil;@Autowiredprivate NamespaceUtil namespaceUtil;@Autowiredprivate Gson gson;@Autowiredprivate BizConfig bizConfig;public NotificationControllerV2() { largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("NotificationControllerV2", true));}
- deferredResults 属性,Watch Key 与 DeferredResultWrapper 的 Multimap 。
- 在下文中,我们会看到大量的 Watch Key 。实际上,目前 Apollo 的实现上,Watch Key 等价于 ReleaseMessage 的通知内容 message 字段。
- Multimap 指的是 Google Guava Multimap ,不熟悉的胖友可以看看 《Guava 学习笔记:Guava 新增集合类型 - Multimap》 。推荐在项目中使用。
- 在 notifications/v2 中,当请求的 Namespace 暂无新通知时,会将该 Namespace 对应的 Watch Key 们,注册到 deferredResults 中。等到 Namespace 配置发生变更时,在 #handleMessage(...) 中,进行通知。
- 其他属性,下文使用到,胖友可以回过头看看代码 + 注释。
2.2 pollNotification
1: @RequestMapping(method = RequestMethod.GET) 2: public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification( 3: @RequestParam(value = "appId") String appId, 4: @RequestParam(value = "cluster") String cluster, 5: @RequestParam(value = "notifications") String notificationsAsString, 6: @RequestParam(value = "dataCenter", required = false) String dataCenter, 7: @RequestParam(value = "ip", required = false) String clientIp) { 8: // 解析 notificationsAsString 参数,创建 ApolloConfigNotification 数组。 9: List<ApolloConfigNotification> notifications = null;10: try {11: notifications = gson.fromJson(notificationsAsString, notificationsTypeReference);12: } catch (Throwable ex) {13: Tracer.logError(ex);14: }15: if (CollectionUtils.isEmpty(notifications)) {16: throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);17: }18:19: // 创建 DeferredResultWrapper 对象20: DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();21: // Namespace 集合22: Set<String> namespaces = Sets.newHashSet();23: // 客户端的通知 Map 。key 为 Namespace 名,value 为通知编号。24: Map<String, Long> clientSideNotifications = Maps.newHashMap();25: // 过滤并创建 ApolloConfigNotification Map26: Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);27: // 循环 ApolloConfigNotification Map ,初始化上述变量。28: for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {29: String normalizedNamespace = notificationEntry.getKey();30: ApolloConfigNotification notification = notificationEntry.getValue();31: // 添加到 `namespaces` 中。32: namespaces.add(normalizedNamespace);33: // 添加到 `clientSideNotifications` 中。34: clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());35: // 记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。36: if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {37: deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);38: }39: }40: if (CollectionUtils.isEmpty(namespaces)) {41: throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);42: }43:44: // 组装 Watch Key Multimap45: Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);46: // 生成 Watch Key 集合47: Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());48: // 获得 Watch Key 集合中,每个 Watch Key 对应的 ReleaseMessage 记录。49: List<ReleaseMessage> latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);50:51: /**52: * Manually close the entity manager.53: * Since for async request, Spring won't do so until the request is finished,54: * which is unacceptable since we are doing long polling - means the db connection would be hold55: * for a very long time56: */57: // 手动关闭 EntityManager58: // 因为对于 async 请求,Spring 在请求完成之前不会这样做59: // 这是不可接受的,因为我们正在做长轮询——意味着 db 连接将被保留很长时间。60: // 实际上,下面的过程,我们已经不需要 db 连接,因此进行关闭。61: entityManagerUtil.closeEntityManager();62: // 获得新的 ApolloConfigNotification 通知数组63: List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);64: // 若有新的通知,直接设置结果。65: if (!CollectionUtils.isEmpty(newNotifications)) {66: deferredResultWrapper.setResult(newNotifications);67: // 若无新的通知,68: } else {69: // 注册超时事件70: deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); // 【TODO 6001】Tracer 日志71: // 注册结束事件72: deferredResultWrapper.onCompletion(() -> {73: // 移除 Watch Key + DeferredResultWrapper 出 `deferredResults`74: // unregister all keys75: for (String key : watchedKeys) {76: deferredResults.remove(key, deferredResultWrapper);77: }78: // 【TODO 6001】Tracer 日志79: logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");80: });81:82: // 注册 Watch Key + DeferredResultWrapper 到 `deferredResults` 中,等待配置发生变化后通知。详见 `#handleMessage(...)` 方法。83: // register all keys84: for (String key : watchedKeys) {85: this.deferredResults.put(key, deferredResultWrapper);86: }87:88: // 【TODO 6001】Tracer 日志89: logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");90: logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", watchedKeys, appId, cluster, namespaces, dataCenter);91: }92:93: return deferredResultWrapper.getResult();94: }
- GET /notifications/v2 接口,具体 URL 在类上注明。
- notificationsAsString 请求参数,JSON 字符串,在【第 8 至 17 行】的代码,解析成List<ApolloConfigNotification> ,表示客户端本地的配置通知信息。
- 因为一个客户端可以订阅多个 Namespace ,所以该参数是 List 。关于 ApolloConfigNotification 类,胖友先跳到 「3. ApolloConfigNotification」 看完在回来。
- 我们可以注意到,该接口真正返回的结果也是 List<ApolloConfigNotification> ,仅返回配置发生变化的 Namespace 对应的 ApolloConfigNotification 。也就说,当有几个 配置发生变化的 Namespace ,返回几个对应的 ApolloConfigNotification 。另外,客户端接收到返回后,会增量合并到本地的配置通知信息。客户端下次请求时,使用合并后的配置通知信息。
- 注意,客户端请求时,只传递 ApolloConfigNotification 的 namespaceName +notificationId ,不传递 messages 。
- clientIp 请求参数,目前该接口暂时用不到,作为预留参数。
|