微信开发中 ACCESS TOKEN 过期失效的解决方案详解
1014
2022-11-09
Nacos源码分析专题(四)-服务发现
1.客户端
1.1.定时更新服务列表
1.1.1.NacosNamingService
在前面我们讲到一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。
多个重载的方法最终都会进入一个方法:
@Overridepublic List
1.1.2.HostReactor
进入订阅服务消息,这里是由HostReactor类的getServiceInfo()方法来实现的:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); // 由 服务名@@集群名拼接 key String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 读取本地服务列表的缓存,缓存是一个Map,格式:Map
基本逻辑就是先从本地缓存读,根据结果来选择:
public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 基于ServerProxy发起远程调用,查询服务列表 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 处理查询结果 processServicejson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } }}
1.1.3.ServerProxy
而ServerProxy的queryList方法如下:
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { // 准备请求参数 final Map
1.2.处理服务变更通知
除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。
在HostReactor类的构造函数中,有非常重要的几个步骤:
基本思路是:
通过PushReceiver监听服务端推送的变更数据解析数据后,通过NotifyCenter发布服务变更的事件InstanceChangeNotifier监听变更事件,完成对服务列表的更新
1.2.1.PushReceiver
我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。
先看构造函数:
public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; // 创建 UDP客户端 String udpPort = getPushReceiverUdpPort(); if (StringUtils.isEmpty(udpPort)) { this.udpSocket = new DatagramSocket(); } else { this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } // 准备线程池 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); // 开启线程任务,准备接收变更数据 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); }}
PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:
@Overridepublic void run() { while (!closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 接收推送数据 udpSocket.receive(packet); // 解析为json字符串 String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); // 反序列化为对象 PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { // 交给 HostReactor去处理 hostReactor.processServiceJson(pushPacket.data); // send ack to server 发送ACK回执,略。。 } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error("[NA] error while receiving push data", e); } }}
1.2.2.HostReactor
通知数据的处理由交给了HostReactor的processServiceJson方法:
public ServiceInfo processServiceJson(String json) { // 解析出ServiceInfo信息 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } // 查询缓存中的 ServiceInfo ServiceInfo oldService = serviceInfoMap.get(serviceKey); // 如果缓存存在,则需要校验哪些数据要更新 boolean changed = false; if (oldService != null) { // 拉取的数据是否已经过期 if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } // 放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例; // modHosts:需要修改的实例 if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { // 发布实例变更的事件 NotifyCenter.publishEvent(new InstancesChangeEvent( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } else { // 本地缓存不存在 changed = true; // 放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 直接发布实例变更的事件 NotifyCenter.publishEvent(new InstancesChangeEvent( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } // 。。。 return serviceInfo;}
2.服务端
2.1.拉取服务列表接口
在介绍的InstanceController中,提供了拉取服务列表的接口:
/** * Get all instance of input service. * * @param request request * @return list of instance * @throws Exception any error during list */@GetMapping("/list")@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public ObjectNode list(HttpServletRequest request) throws Exception { // 从request中获取namespaceId和serviceName String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY); // 获取客户端的 UDP端口 int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0")); String env = WebUtils.optional(request, "env", StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); String app = WebUtils.optional(request, "app", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); // 获取服务列表 return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);}
进入doSrvIpxt()方法来获取服务列表:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 获取服务列表信息 Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { if (udpPort > 0 && pushService.canEnablePush(agent)) { // 添加当前客户端 IP、UDP端口到 PushService 中 pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null) { // 如果没找到,返回空 if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis); result.replace("hosts", JacksonUtils.createEmptyArrayNode()); return result; } // 结果的检测,异常实例的剔除等逻辑省略 // 最终封装结果并返回 。。。 result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result;}
2.2.发布服务变更的UDP通知
在上一节中,InstanceController中的doSrvIpxt()方法中,有这样一行代码:
pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app);
其实是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息。
PushService类本身实现了ApplicationListener接口:
这个是事件-接口,监听的是ServiceChangeEvent(服务变更事件)。当服务列表变化时,就会通知我们:
3.总结
Nacos的服务发现分为两种模式:
模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。
模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。
与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~