Nacos源码分析专题(四)-服务发现

网友投稿 1014 2022-11-09

Nacos源码分析专题(四)-服务发现

Nacos源码分析专题(四)-服务发现

1.客户端

1.1.定时更新服务列表

1.1.1.NacosNamingService

在前面我们讲到一个类​​NacosNamingService​​,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。

多个重载的方法最终都会进入一个方法:

@Overridepublic List getAllInstances(String serviceName, String groupName, List clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 1.判断是否需要订阅服务信息(默认为 true) if (subscribe) { // 1.1.订阅服务信息 serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 1.2.直接去nacos拉取服务信息 serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } // 2.从服务信息中获取实例列表并返回 List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList(); } return list;}

1.1.2.HostReactor

进入订阅服务消息,这里是由​​HostReactor​​​类的​​getServiceInfo()​​方法来实现的:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); // 由 服务名@@集群名拼接 key String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 读取本地服务列表的缓存,缓存是一个Map,格式:Map ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // 判断缓存是否存在 if (null == serviceObj) { // 不存在,创建空ServiceInfo serviceObj = new ServiceInfo(serviceName, clusters); // 放入缓存 serviceInfoMap.put(serviceObj.getKey(), serviceObj); // 放入待更新的服务列表(updatingMap)中 updatingMap.put(serviceName, new Object()); // 立即更新服务列表 updateServiceNow(serviceName, clusters); // 从待更新列表中移除 updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // 缓存中有,但是需要更新 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish 等待5秒中,待更新完成 synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 开启定时更新服务列表的功能 scheduleUpdateIfAbsent(serviceName, clusters); // 返回缓存中的服务信息 return serviceInfoMap.get(serviceObj.getKey());}

基本逻辑就是先从本地缓存读,根据结果来选择:

public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 基于ServerProxy发起远程调用,查询服务列表 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 处理查询结果 processServicejson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } }}

1.1.3.ServerProxy

而​​ServerProxy​​​的​​queryList​​方法如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { // 准备请求参数 final Map params = new HashMap(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); // 发起请求,地址与API接口一致 return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);}

1.2.处理服务变更通知

除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。

在​​HostReactor​​类的构造函数中,有非常重要的几个步骤:

基本思路是:

通过​​PushReceiver​​监听服务端推送的变更数据解析数据后,通过​​NotifyCenter​​发布服务变更的事件​​InstanceChangeNotifier​​监听变更事件,完成对服务列表的更新

1.2.1.PushReceiver

我们先看​​PushReceiver​​​,这个类会以​​UDP​​​方式接收​​Nacos​​服务端推送的服务变更数据。

先看构造函数:

public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; // 创建 UDP客户端 String udpPort = getPushReceiverUdpPort(); if (StringUtils.isEmpty(udpPort)) { this.udpSocket = new DatagramSocket(); } else { this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } // 准备线程池 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); // 开启线程任务,准备接收变更数据 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); }}

​​PushReceiver​​​构造函数中基于线程池来运行任务。这是因为​​PushReceiver​​​本身也是一个​​Runnable​​,其中的run方法业务逻辑如下:

@Overridepublic void run() { while (!closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 接收推送数据 udpSocket.receive(packet); // 解析为json字符串 String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); // 反序列化为对象 PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { // 交给 HostReactor去处理 hostReactor.processServiceJson(pushPacket.data); // send ack to server 发送ACK回执,略。。 } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error("[NA] error while receiving push data", e); } }}

1.2.2.HostReactor

通知数据的处理由交给了​​HostReactor​​​的​​processServiceJson​​方法:

public ServiceInfo processServiceJson(String json) { // 解析出ServiceInfo信息 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } // 查询缓存中的 ServiceInfo ServiceInfo oldService = serviceInfoMap.get(serviceKey); // 如果缓存存在,则需要校验哪些数据要更新 boolean changed = false; if (oldService != null) { // 拉取的数据是否已经过期 if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } // 放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例; // modHosts:需要修改的实例 if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { // 发布实例变更的事件 NotifyCenter.publishEvent(new InstancesChangeEvent( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } else { // 本地缓存不存在 changed = true; // 放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 直接发布实例变更的事件 NotifyCenter.publishEvent(new InstancesChangeEvent( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } // 。。。 return serviceInfo;}

2.服务端

2.1.拉取服务列表接口

在介绍的​​InstanceController​​中,提供了拉取服务列表的接口:

/** * Get all instance of input service. * * @param request request * @return list of instance * @throws Exception any error during list */@GetMapping("/list")@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public ObjectNode list(HttpServletRequest request) throws Exception { // 从request中获取namespaceId和serviceName String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY); // 获取客户端的 UDP端口 int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0")); String env = WebUtils.optional(request, "env", StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); String app = WebUtils.optional(request, "app", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); // 获取服务列表 return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);}

进入​​doSrvIpxt()​​方法来获取服务列表:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 获取服务列表信息 Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { if (udpPort > 0 && pushService.canEnablePush(agent)) { // 添加当前客户端 IP、UDP端口到 PushService 中 pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null) { // 如果没找到,返回空 if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis); result.replace("hosts", JacksonUtils.createEmptyArrayNode()); return result; } // 结果的检测,异常实例的剔除等逻辑省略 // 最终封装结果并返回 。。。 result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result;}

2.2.发布服务变更的UDP通知

在上一节中,​​InstanceController​​​中的​​doSrvIpxt()​​方法中,有这样一行代码:

pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app);

其实是把消费者的​​UDP​​​端口、IP等信息封装为一个​​PushClient​​​对象,存储​​PushService​​中。方便以后服务变更后推送消息。

​​PushService​​​类本身实现了​​ApplicationListener​​接口:

这个是事件-接口,监听的是​​ServiceChangeEvent​​(服务变更事件)。当服务列表变化时,就会通知我们:

3.总结

Nacos的服务发现分为两种模式:

模式一:主动拉取模式,消费者定期主动从​​Nacos​​拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。

模式二:订阅模式,消费者订阅​​Nacos​​​中的服务列表,并基于​​UDP协议​​​来接收服务变更通知。当Nacos中的服务列表更新时,会发送​​UDP广播​​给所有订阅者。

与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:​Python是如何表示时间的?2个模块、3种方式,1文看懂~
下一篇:文件查询匹配神器 【glob.js】 实用教程
相关文章

 发表评论

暂时没有评论,来抢沙发吧~