Nacos Config 动态刷新源码剖析

网友投稿 934 2022-10-20

Nacos Config 动态刷新源码剖析

Nacos Config 动态刷新源码剖析

从远端服务器获取变更数据的主要模式有两种:推(push)和拉(pull)。Push 模式简单来说就是服务端主动将数据变更信息推送给客户端,这种模式优点是时效性好,服务端数据发生变更可以立马通知到客户端,但这种模式需要服务端维持与客户端的心跳连接,会增加服务端实现的复杂度,服务端也需要占用更多的资源来维持与客户端的连接。

而 Pull 模式则是客户端主动去服务器请求数据,例如,每间隔10ms就向服务端发起请求获取数据。显而易见pull模式存在时效性问题。请求的间隔也不太好设置,间隔太短,对服务器请求压力过大。间隔时间过长,那么必然会造成时效性很差。而且如果配置长时间不更新,并且存在大量的客户端就会产生大量无效的pull请求。

Nacos 没有采用上述的两种模式,而是采用了长轮询方式结合了推和拉的优点:

长轮询也是轮询,因此 Nacos 客户端会默认每10ms向服务端发起请求,当客户端请求服务端时会在请求头上携带长轮询的超时时间,默认是30s。而服务端接收到该请求时会hang住请求,为了防止客户端超时会在请求头携带的超时时间上减去500ms,因此默认会hang住请求29.5s。在这期间如果服务端发生了配置变更会产生相应的事件,监听到该事件后,会响应对应的客户端。这样一来客户端不会频繁发起轮询请求,而服务端也不需要维持与客户端的心跳,兼备了时效性和复杂度。

如果你觉得源码枯燥的话,可以选择不看后半部分的源码,先通过这张流程图去了解Nacos动态刷新机制的流程:

Nacos Config 长轮询源码剖析

首先,打开 ​​com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration​​​ 这个类,从类名也可以看出该类是Nacos Config的启动配置类,是Nacos Config自动装配的入口。在该类中的 ​​nacosConfigManager​​​ 方法实例化了一个 ​​NacosConfigManager​​ 对象,并注册到容器中:

@Bean@ConditionalOnMissingBeanpublic NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) { return new NacosConfigManager(nacosConfigProperties);}

在 ​​NacosConfigManager​​​ 的构造器中调用了 ​​createConfigService​​​ 方法,这是一个静态方法用来创建 ​​ConfigService​​ 对象的单例。

/** * Compatible with old design,It will be perfected in the future. */static ConfigService createConfigService( NacosConfigProperties nacosConfigProperties) { // 双重检查锁模式的单例 if (Objects.isNull(service)) { synchronized (NacosConfigManager.class) { try { if (Objects.isNull(service)) { service = NacosFactory.createConfigService( nacosConfigProperties.assembleConfigServiceProperties()); } } catch (NacosException e) { log.error(e.getMessage()); throw new NacosConnectionFailureException( nacosConfigProperties.getServerAddr(), e.getMessage(), e); } } } return service;}

​​ConfigService​​​ 的具体实现是 ​​NacosConfigService​​​,在该类的构造器中主要初始化了 ​​HttpAgent​​​ 和 ​​ClientWorker​​​ 对象。​​ClientWorker​​ 的构造器中则初始化了几个线程池:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); // 创建具有定时执行功能的单线程池,用于定时执行 checkConfigInfo 方法 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); // 创建具有定时执行功能的且线程数与cpu核数相对应的线程池,用于根据需要动态刷新的配置文件执行 LongPollingRunnable,因此长轮询任务是可以有多个并行的 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); // 每10ms执行一次 checkConfigInfo 方法 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS);}private void init(Properties properties) { // 长轮询的超时时间,默认为30秒,此参数会被放到请求头中带到服务端,服务端会根据该参数去做长轮询的hold timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); taskPenaltyTime = ConvertUtils .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); this.enableRemoteSyncConfig = Boolean .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));}/** * Check config info. */public void checkConfigInfo() { // Dispatch taskes. // 获取需要监听的文件数量 int listenerSize = cacheMap.size(); // Round up the longingTaskCount. // 默认一个 LongPollingRunnable 可以处理监听3k个配置文件的变化,超过3k个才会创建新的 LongPollingRunnable int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}

​​LongPollingRunnable​​​ 类主要用于检查本地配置,以及长轮询地去服务端获取变更配置的 dataid 和 group,其代码位于 ​​com.alibaba.nacos.client.config.impl.ClientWorker​​ 类,代码如下:

class LongPollingRunnable implements Runnable { private final int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List cacheDatas = new ArrayList(); List inInitializingCacheList = new ArrayList(); try { // check failover config // 遍历本地缓存的配置 for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 检查本地配置 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config // 通过长轮询检查服务端配置 List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, cnotallow={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }}

上面有个 ​​checkUpdateDataIds​​​ 方法,用于获取发生变更了的配置文件的dataId列表,它同样位于 ​​ClientWorker​​ 内。如下:

/** * Fetch the dataId list from server. * * @param cacheDatas CacheDatas for config infomations. * @param inInitializingCacheList initial cache lists. * @return String include dataId and group (ps: it maybe null). * @throws Exception Exception. */List checkUpdateDataIds(List cacheDatas, List inInitializingCacheList) throws Exception { // 拼接出配置文件的唯一标识 StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // It updates when cacheData occours in cacheMap by first time. inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}/** * Fetch the updated dataId list from server. * * @param probeUpdateString updated attribute string value. * @param isInitializingCacheList initial cache lists. * @return The updated dataId list(ps: it maybe null). * @throws IOException Exception. */List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { Map params = new HashMap(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map headers = new HashMap(2); // 长轮询的超时时间 headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem. long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); // 向服务端发起一个 HttpRestResult result = agent .+ "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); // 响应状态是成功则解析响应体得到 dataId、group、tenant 等信息并返回 return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList();}

客户端对 ​​listener​​​ 接口的请求会进入到服务端的​​com.alibaba.nacos.config.server.controller.ConfigController#listener​​​ 方法进行处理,该方法主要是调用了 ​​com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig​​ 方法。代码如下:

/** * 轮询接口 */public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 如果支持长轮询则进入长轮询的流程 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // else 兼容短轮询逻辑 List changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 兼容短轮询result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); /** * 2.0.4版本以前, 返回值放入header中 */ if (versionNum < START_LONGPOLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } // 禁用缓存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + "";}

我们主要关注上面的 ​​com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient​​ 长轮询流程的方法。代码如下:

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, int probeRequestSize) { String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP线程调用,否则离开后容器会立即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L); // 在 ClientLongPolling 的 run 方法会将 ClientLongPolling 实例(携带了本次请求的相关信息)放入 allSubs 中,然后会在29.5s后再执行另一个 Runnable,该 Runnable 用于等待29.5s后依旧没有相应的配置变更时对客户端进行响应,并将相应的 ClientLongPolling 实例从 allSubs 中移出 scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}

而 ​​LongPollingService​​​ 实现了 ​​AbstractEventListener​​​,也就是说能接收事件通知,在其 ​​com.alibaba.nacos.config.server.service.LongPollingService#onEvent​​​ 方法中可以看到,它关注的是 ​​LocalDataChangeEvent​​ 事件:

@Overridepublic void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } }}

在nacos上修改配置后就会产生 ​​LocalDataChangeEvent​​​ 事件,此时 ​​LongPollingService​​​ 也就能监听到,当收到该事件时就会遍历 ​​allSubs​​​,找到匹配的请求并将 ​​groupKey​​​ 返回给客户端。具体代码在 ​​DataChangeTask​​ 中:

class DataChangeTask implements Runnable{ @Override public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 如果beta发布且不在beta列表直接跳过 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 如果tag发布且不在tag列表直接跳过 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 删除订阅关系 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } } DataChangeTask(String groupKey) { this(groupKey, false, null); } DataChangeTask(String groupKey, boolean isBeta, List betaIps) { this(groupKey, isBeta, betaIps, null); } DataChangeTask(String groupKey, boolean isBeta, List betaIps, String tag) { this.groupKey = groupKey; this.isBeta = isBeta; this.betaIps = betaIps; this.tag = tag; } final String groupKey; final long changeTime = System.currentTimeMillis(); final boolean isBeta; final List betaIps; final String tag;}

当客户端收到变更的dataid+group后,就会去服务端获取最新的配置数据,并更新本地数据 ​​cacheData​​,然后发送数据变更事件,整个流程结束。

获取服务端最新配置数据的方法:​​com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig​​发送数据变更事件的方法:​​com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5​​

最后附上一张流程与源码的对应图:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Cap Framework- Web应用开发框架
下一篇:Fortune.js- 超媒体 API 原型框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~