Nacos版本1.3.1
启动console项目
启动配置修改为单机启动
-Dnacos.standalone=true
Nacos服务注册与发现
启动nacos服务端,启动nacos客户端,注册到nacos服务端
查看一下控制台输出日志
客户端
2022-01-03 23:01:18.864 INFO 14756 --- [ main] c.a.c.n.registry.NacosServiceRegistry : nacos registry, DEFAULT_GROUP order-service 172.17.115.1:9000 register finished
查看到NacosServiceRegistry的源码部分
根据日志定位到register这个方法
@Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); } }
点到
namingService.registerInstance(serviceId, group, instance);
这个方法里面
源码如下
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
可以发现,nacos在做服务注册的时候,将信息发送给nacos服务端的信息groupedServiceName是由NamingUtils.getGroupedName(serviceName, groupName);
服务名称+分组名称拼接在一起组成的。
最后进行注册
核心为
serverProxy.registerService(groupedServiceName, groupName, instance);
点进去源码
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Mapparams = new HashMap (16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getmetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
重点看reqApi()这个方法。
public String reqApi(String api, Mapparams, Map body, List servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(NacosException.INVALID_PARAM, "no server available"); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } } index = (index + 1) % servers.size(); } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }
取自reqApi()中的一部分
Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } } index = (index + 1) % servers.size(); }
这里取了一个随机数,假设nacos服务有三个,那么随机数就是0-3中的任意一个随机数,循环调用服务,如果调用成功直接返回退出for循环,如果失败继续循环,对index+1取模,直到循环结束,如果还是失败那么
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
直接抛出异常退出。
把断点打进callServer方法,重启服务,查看一下
这个就是nacos服务的注册请求
这时候翻到nacos服务端源码
找到InstanceController
此时定位到
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
这个方法中
把断点打过来
可以看到,代码中解析了一下刚刚发送的注册解析的参数,包括服务名称等。
代码继续跟到registerInstance中
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
这里一开始创建了一个空的Service,那这个空的Service是干啥的呢?
看下Service这个类中的注释
We introduce a ‘service --> cluster --> instance’ model, in which service stores a list of clusters, which
也就是说先通过service找到群体,然后根据群体再找到实例,最后存放到concurrentHashMap集合中去。
因此就有了下面的源码
if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } }
初始的时候创建出这个service对象。
回到registerInstance()方法
最后通过addInstance()方法将要注册的服务添加到集合中保存起来
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { ListinstanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
这里synchronized加锁,锁的是Service这个对象,不锁其他。相当于先把当前我们的服务给锁住。
找到put方法中的onPut方法,实现为DistroConsistencyServiceImpl这个类
@Override public void put(String key, Record value) throws NacosException { onPut(key, value); taskDispatcher.addTask(key); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datumdatum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } notifier.addTask(key, ApplyAction.CHANGE); }
实际上底层是通过dataStore去put的
dataStrore对象
@Component public class DataStore { private MapdataMap = new ConcurrentHashMap<>(1024); public void put(String key, Datum value) { dataMap.put(key, value); } public Datum remove(String key) { return dataMap.remove(key); } public Set keys() { return dataMap.keySet(); } public Datum get(String key) { return dataMap.get(key); } public boolean contains(String key) { return dataMap.containsKey(key); } public Map batchGet(List keys) { Map map = new HashMap<>(128); for (String key : keys) { Datum datum = dataMap.get(key); if (datum == null) { continue; } map.put(key, datum); } return map; } public int getInstanceCount() { int count = 0; for (Map.Entry entry : dataMap.entrySet()) { try { Datum instancesDatum = entry.getValue(); if (instancesDatum.value instanceof Instances) { count += ((Instances) instancesDatum.value).getInstanceList().size(); } } catch (Exception ignore) { } } return count; } public Map getDataMap() { return dataMap; } }
这样看就比较清楚了,实际上dataStore里面是为一个concurrenthashMap集合。
至此,nacos的服务注册过程底层原理分析就完成了。
Nacos服务发现源码分析
源码如下
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Mapparams = new HashMap (8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqApi(UtilAndComs.nacosUrlbase + "/instance/list", params, HttpMethod.GET); }
找到调用链
源码如下
public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (pushEmptyProtection && !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); MapoldHostMap = new HashMap (oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map newHostMap = new HashMap (serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set modHosts = new HashSet (); Set newHosts = new HashSet (); Set remvHosts = new HashSet (); List > newServiceHosts = new ArrayList >( newHostMap.entrySet()); for (Map.Entry entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } else { changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; }
这时候再启动一个服务注册到nacos去,因为底层有线程池定时器每隔30s间隔轮询去调用查看是否有新服务注册,此时代码来到了
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); }
可以看到这里为事件驱动的设计模式,底层基于订阅发布模式在做一个订阅和发布的动作。
最后看到
public void registerListener(String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSeteventListeners = listenerMap.get(key); if (eventListeners == null) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet (); listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); }
这里,把这个服务装到concurrenthashMap集合中,这就完成了。
Nacos底层服务下线源码
查看一下日志由于生产环境2022-01-04 19:49:20.525 INFO 3532 --- [extShutdownHook] c.a.c.n.registry.NacosServiceRegistry : De-registering from Nacos Server now...
找到对应源码
@Override public void deregister(Registration registration) { log.info("De-registering from Nacos Server now..."); if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No dom to de-register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); try { namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName()); } catch (Exception e) { log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e); } log.info("De-registration finished."); }
主要看
namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName());
这个方法,一直点到最后,发现还是走reqApi这个方法。源码如下
public void deregisterService(String serviceName, Instance instance) throws NacosException { NAMING_LOGGER .info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, instance); final Mapparams = new HashMap (8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.DELETE); }
最后发现依然是发送一个rest请求到服务端。
这时,我们在服务端打一个断点,重启我们的客户端服务。
看到现象
由于生产环境,可能由于cpu不足,内存泄漏等原因,通常是非正常停止,nacos的非正常停止和eureka类似,非正常停止情况,nacos也是无法删除掉这个实例。
那么如何解决呢?
消费者在调用接口的时候,如果调用不通,利用本地负载均衡算法,故障转移方式,切换的另一台节点。
Nacos心跳续约机制源码
nacos默认每隔5s发送心跳包,执行心跳续约。时间设置为当前系统时间。
一开始在BeatReactor类中初始化一个定时任务线程池,源码如下
public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); }
具体看到BeatTask类
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setmetadata(beatInfo.getmetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
这里的JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
这个方法实现发送sendBeat()
来看看这里面是怎么实现的
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Mapparams = new HashMap (8); Map bodyMap = new HashMap (2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlbase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
看一下发现,也是发送rest请求,通过reqApi方法。
此时跟到服务端
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put("clientBeatInterval", switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setmetadata(clientBeat.getmetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval()); result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
打上断点
发现,每隔一段时间,客户端,就会请求这个接口,来进行心跳续约。
进入后,获取ip端口,服务名称等信息。
都获取到之后,如果instance为空,nacos会判断下,如果为空,会帮你再次进行一个注册
也就是
serviceManager.registerInstance(namespaceId, serviceName, instance);
这段逻辑
接下来看到
service.processClientBeat(clientBeat);
源码如下
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
这里的scheduleNow延时等待队列底层实现是
public static ScheduledFuture> scheduleNow(Runnable task) { return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS); }
此时代码走到ClientBeatProcessor的run方法
@Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); Listinstances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } } }
此时在这个for循环中的instance.setLastBeat(System.currentTimeMillis());
也就是设置我们的最后心跳续约时间。
现在代码定位到com.alibaba.nacos.naming.core .Service#init()方法,源码如下
public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entryentry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
这里就是nacosServer开启了心跳检测
目的是为了检测nacosServer里面有没有过期的地址
传参的ClientBeatCheckTask也是一个线程
run方法重点看到
// then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } }
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut())
判断到如果当前系统时间减去实例的最后心跳续约时间,如果大于超时时间,就去做一个移除动作,也就是deleteIp(instance);方法
deleteIp()具体实现为
private void deleteIp(Instance instance) { try { NamingProxy.Request request = NamingProxy.Request.newRequest(); request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())) .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()) .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId()); String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath() + UtilsAndCommons.NACOS_NAMING_ConTEXT + "/instance?" + request.toUrl(); // delete instance asynchronously: HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() { @Override public Object onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), response.getResponseBody(), response.getStatusCode()); } return null; } }); } catch (Exception e) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e); } }
可以看到
HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() { @Override public Object onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), response.getResponseBody(), response.getStatusCode()); } return null; } });
这里也就是调用自己的删除方法也就是前面的
@CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception
再来看看第一个for循环做了什么
// first set health status of instances: for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } }
这里判断如果15s没有续约,不会移除服务,而是走Spring的事件通知。
通知方法
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
再来看到服务保护的原理
if (!getGlobalConfig().isExpireInstance()) { return; }
那为什么2个循环呢?
这里就类似eureka的服务保护原理 ,如果设置为true,那么就不走了直接返回,这点和eureka一样。所以如果配置关闭了这个,那么心跳续约压根不会走下面的循环去移除
第一个循环判断如果续约超时的时候,通过Spring事件通知,利用回调的方式通知,这样我们就会知道有哪些接口续约是超时了。第一个判断15s以内,第二个30s以内。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)