Nacos源码分析专题(三)-服务心跳

网友投稿 903 2022-10-06

Nacos源码分析专题(三)-服务心跳

Nacos源码分析专题(三)-服务心跳

文章目录

​​1. 引言​​​​2. 客户端​​

​​2.1 BeatInfo​​​​2.2. BeatReactor​​​​2.3. BeatTask​​​​2.4. 发送心跳​​

​​3.服务端​​

​​3.1.InstanceController​​​​3.2.处理心跳请求​​​​3.3.心跳异常检测​​​​3.4.主动健康检测​​

​​4.总结​​

​​4.1.acos的健康检测有两种模式​​​​4.2.那么为什么Nacos有临时和永久两种实例呢​​

1. 引言

Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:

spring: application: name: order-service cloud: nacos: discovery: ephemeral: false # 设置实例为永久实例。true:临时; false:永久 server-addr: 192.168.150.1:8845

临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。

其中Nacos提供的心跳的API接口为:

接口描述:发送某个实例的心跳请求类型:PUT请求路径:

/nacos/v1/ns/instance/beat

请求参数:

名称

类型

是否必选

描述

serviceName

字符串


服务名

groupName

字符串


分组名

ephemeral

boolean


是否临时实例

beat

字符串

JSON格式字符串


错误编码:

错误代码

描述

语义

400

Bad Request

客户端请求中的语法错误

403

Forbidden

没有权限

404

Not Found

无法找到资源

500

Internal Server Error

服务器内部错误

200

OK

正常

2. 客户端

在服务注册这一节中,我们说过​​NacosNamingService​​这个类实现了服务的注册,同时也实现了服务心跳:

@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 判断是否是临时实例。 if (instance.isEphemeral()) { // 如果是临时实例,则构建心跳信息BeatInfo BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); // 添加心跳任务 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance);}

2.1 BeatInfo

这里的​​BeanInfo​​就包含心跳需要的各种信息:

2.2. BeatReactor

而​​BeatReactor​​这个类则维护了一个线程池:

当调用​​BeatReactor​​​的.​​addBeatInfo(groupedServiceName, beatInfo)​​方法时,就会执行心跳:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); // 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod() executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}

心跳周期的默认值在​​com.alibaba.nacos.api.common.Constants​​类中:

可以看到是5秒,默认5秒一次心跳。

2.3. BeatTask

心跳的任务封装在​​BeatTask​​​这个类中,是一个​​Runnable​​,其run方法如下:

@Overridepublic void run() { if (beatInfo.isStopped()) { return; } // 获取心跳周期 long nextTime = beatInfo.getPeriod(); try { // 发送心跳 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } // 判断心跳结果 int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { // 如果失败,则需要 重新注册实例 Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } catch (Exception unknownEx) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}", JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx); } finally { executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }}

2.4. 发送心跳

最终心跳的发送还是通过​​NamingProxy​​​的​​sendBeat​​方法来实现:

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } // 组织请求参数 Map params = new HashMap(8); Map bodyMap = new HashMap(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); // 发送请求,这个地址就是:/v1/ns/instance/beat String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result);}

3.服务端

对于临时实例,服务端代码分两部分:

1)​​InstanceController​​​提供了一个接口,处理客户端的心跳请求 2)定时检测实例心跳是否按期执行

3.1.InstanceController

与服务注册时一样,在​​nacos-naming​​​模块中的​​InstanceController​​类中,定义了一个方法用来处理心跳请求:

@CanDistro@PutMapping("/beat")@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public ObjectNode beat(HttpServletRequest request) throws Exception { // 解析心跳的请求参数 ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); // 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息 // 从Nacos的注册表中 获取实例 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果获取失败,说明心跳失败,实例尚未注册 if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); // 这里重新注册一个实例 instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } // 尝试基于namespaceId和serviceName从 注册表中获取Service服务 Service service = serviceManager.getService(namespaceId, serviceName); // 如果不存在,说明服务不存在,返回404 if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // 如果心跳没问题,开始处理心跳结果 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result;}

最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给​​Service​​​类处理这次心跳请求。调用了​​Service​​​的​​processClientBeat​​方法

3.2.处理心跳请求

查看​​Service​​​的​​service.processClientBeat(clientBeat)​​;方法:

public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor);}

可以看到心跳信息被封装到了 ​​ClientBeatProcessor​​​类中,交给了​​HealthCheckReactor​​​处理,​​HealthCheckReactor​​就是对线程池的封装,不用过多查看。

关键的业务逻辑都在​​ClientBeatProcessor​​​这个类中,它是一个​​Runnable​​​,其中的​​run​​方法如下:

@Overridepublic void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); // 获取集群信息 Cluster cluster = service.getClusterMap().get(clusterName); // 获取集群中的所有实例信息 List instances = cluster.allIPs(true); for (Instance instance : instances) { // 找到心跳的这个实例 if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 更新实例的最后一次心跳时间 lastBeat instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } }}

处理心跳请求的核心就是更新心跳实例的最后一次心跳时间,lastBeat,这个会成为判断实例心跳是否过期的关键指标!

3.3.心跳异常检测

在服务注册时,一定会创建一个​​Service​​​对象,而​​Service​​​中有一个​​init​​方法,会在注册时被调用:

public void init() { // 开启心跳检测的任务 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); }}

其中​​HealthCheckReactor.scheduleCheck​​就是执行心跳检测的定时任务:

可以看到,该任务是​​5000ms​​​执行一次,也就是​​5​​秒对实例的心跳状态做一次检测。

此处的​​ClientBeatCheckTask​​​同样是一个​​Runnable​​​,其中的​​run​​方法为:

@Overridepublic void run() { try { // 找到所有临时实例的列表 List instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { // 判断 心跳间隔(当前时间 - 最后一次心跳时间) 是否大于 心跳超时时间,默认15秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { // 如果超时,标记实例为不健康 healthy = false instance.setHealthy(false); // 发布实例状态变更的事件 getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } // 判断心跳间隔(当前时间 - 最后一次心跳时间)是否大于 实例被删除的最长超时时间,默认30秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // 如果是超过了30秒,则删除实例 Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }}

其中的超时时间同样是在​​com.alibaba.nacos.api.common.Constants​​这个类中:

3.4.主动健康检测

对于非临时实例(​​ephemeral=false​​),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。

入口在​​ServiceManager​​​类中的​​registerInstance​​方法:

创建空服务时:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { // 如果服务不存在,创建新的服务 createServiceIfAbsent(namespaceId, serviceName, local, null);}

创建服务流程:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { // 尝试获取服务 Service service = getService(namespaceId, serviceName); if (service == null) { // 发现服务不存在,开始创建新服务 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // ** 写入注册表并初始化 ** putServiceAndInit(service); if (!local) { addOrReplaceService(service); } }}

关键在​​putServiceAndInit(service)​​方法中:

private void putServiceAndInit(Service service) throws NacosException { // 将服务写入注册表 putService(service); service = getService(service.getNamespaceId(), service.getName()); // 完成服务的初始化 service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}

进入初始化逻辑:​​service.init()​​,这个会进入Service类中:

/** * Init service. */public void init() { // 开启临时实例的心跳监测任务 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); // 遍历注册表中的集群 for (Map.Entry entry : clusterMap.entrySet()) { entry.getValue().setService(this); // 完成集群初识化 entry.getValue().init(); }}

这里集群的初始化 ​​entry.getValue().init()​​​;会进入​​Cluster​​​类型的​​init()​​方法:

/** * Init cluster. */public void init() { if (inited) { return; } // 创建健康检测的任务 checkTask = new HealthCheckTask(this); // 这里会开启对 非临时实例的 定时健康检测 HealthCheckReactor.scheduleCheck(checkTask); inited = true;}

这里的​​HealthCheckReactor.scheduleCheck(checkTask)​​​;会开启定时任务,对非临时实例做健康检测。检测逻辑定义在​​HealthCheckTask​​​这个类中,是一个​​Runnable​​​,其中的​​run​​方法:

public void run() { try { if (distroMapper.responsible(cluster.getService().getName()) && switchDomain .isHealthCheckEnabled(cluster.getService().getName())) { // 开始健康检测 healthCheckProcessor.process(this); // 记录日志 。。。 } } catch (Throwable e) { // 记录日志 。。。 } finally { if (!cancelled) { // 结束后,再次进行任务调度,一定延迟后执行 HealthCheckReactor.scheduleCheck(this); // 。。。 } }}

健康检测逻辑定义在​​healthCheckProcessor.process(this);​​​方法中,在​​HealthCheckProcessor​​​接口中,这个接口也有很多实现,默认是​​TcpSuperSenseProcessor​​:

进入​​TcpSuperSenseProcessor​​​的​​process​​方法:

@Overridepublic void process(HealthCheckTask task) { // 获取所有 非临时实例的 集合 List ips = task.getCluster().allIPs(false); if (CollectionUtils.isEmpty(ips)) { return; } for (Instance ip : ips) { // 封装健康检测信息到 Beat Beat beat = new Beat(ip, task); // 放入一个阻塞队列中 taskQueue.add(beat); MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet(); }}

可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略,可以看到​​Nacos​​中大量这样的设计。

而​​TcpSuperSenseProcessor​​​本身就是一个​​Runnable​​,在它的构造函数中会把自己放入线程池中去执行,其run方法如下:

public void run() { while (true) { try { // 处理任务 processTask(); // ... } catch (Throwable e) { SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e); } }}

通过​​processTask​​来处理健康检测的任务:

private void processTask() throws Exception { // 将任务封装为一个 TaskProcessor,并放入集合 Collection> tasks = new LinkedList<>(); do { Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS); if (beat == null) { return; } tasks.add(new TaskProcessor(beat)); } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64); // 批量处理集合中的任务 for (Future f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) { f.get(); }}

任务被封装到了​​TaskProcessor​​​中去执行了,​​TaskProcessor​​​是一个​​Callable​​​,其中的​​call​​方法

@Overridepublic Void call() { // 获取检测任务已经等待的时长 long waited = System.currentTimeMillis() - beat.getStartTime(); if (waited > MAX_WAIT_TIME_MILLISECONDS) { Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms"); } SocketChannel channel = null; try { // 获取实例信息 Instance instance = beat.getIp(); // 通过NIO建立TCP连接 channel = SocketChannel.open(); channel.configureBlocking(false); // only by setting this can we make the socket close event asynchronous channel.socket().setSoLinger(false, -1); channel.socket().setReuseAddress(true); channel.socket().setKeepAlive(true); channel.socket().setTcpNoDelay(true); Cluster cluster = beat.getTask().getCluster(); int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport(); channel.connect(new InetSocketAddress(instance.getIp(), port)); // 注册连接、读取事件 SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(beat); keyMap.put(beat.toString(), new BeatKey(key)); beat.setStartTime(System.currentTimeMillis()); GlobalExecutor .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); if (channel != null) { try { channel.close(); } catch (Exception ignore) { } } } return null;}

4.总结

4.1.acos的健康检测有两种模式

临时实例:

采用客户端心跳检测模式,心跳周期5秒 心跳间隔超过15秒则标记为不健康 心跳间隔超过30秒则从服务列表删除

永久实例:

采用服务端主动健康检测方式 周期为2000 + 5000毫秒内的随机数 检测异常只会标记为不健康,不会删除

4.2.那么为什么Nacos有临时和永久两种实例呢

以淘宝为例,双十一大促期间,流量会比平常高出很多,此时服务肯定需要增加更多实例来应对高并发,而这些实例在双十一之后就无需继续使用了,采用临时实例比较合适。而对于服务的一些常备实例,则使用永久实例更合适。

与eureka相比,Nacos与Eureka在临时实例上都是基于心跳模式实现,差别不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。

另外,Nacos支持永久实例,而Eureka不支持,Eureka只提供了心跳模式的健康监测,而没有主动检测功能。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:关于微信小程序中弹框和模态框的实现(微信小程序模态框组件)
下一篇:微信小程序 image组件binderror使用例子与js中onerror的区别分析(微信小程序开发一个多少钱)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~