1、获取服务信息2、故障转移3、从缓存中获取服务信息4、从远程更新服务信息5、服务正在更新中6、定时更新服务
1、获取服务信息HostReactor 中的 getServiceInfo( ) 方法
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); //开启故障转移模式,从failoverReactor 获取服务列表 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //从缓存中获取服务列表 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { //服务信息不存在时,创建新的服务信息,并放入缓存serviceInfoMap serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); //记录服务正在更新中 updatingMap.put(serviceName, new Object()); //更新服务 updateServiceNow(serviceName, clusters); //服务更新完删除缓存 updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //服务正在更新中,wait 5s UPDATE_HOLD_INTERVAL = 5000L if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //服务更新任务不存在时,创建服务更新任务并调度 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }2、故障转移
//failover-mode参数是由SwitchRefresher来处理的 public boolean isFailoverSwitch() { return Boolean.parseBoolean(switchParams.get("failover-mode")); } //serviceMap是由FailoverFileReader来处理的 public ServiceInfo getService(String key) { ServiceInfo serviceInfo = serviceMap.get(key); if (serviceInfo == null) { serviceInfo = new ServiceInfo(); serviceInfo.setName(key); } return serviceInfo; }3、从缓存中获取服务信息
private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); //从 serviceInfoMap 中获取服务信息 return serviceInfoMap.get(key); }4、从远程更新服务信息
当缓存中不存在服务信息时,更新服务
(1)updateServiceNow( )
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } }
(2)updateService( )
public void updateService(String serviceName, String clusters) throws NacosException { //获取远程服务列表前,再次查询缓存中是否存在服务信息,如果存在,在finally里将等待服务列表的线程唤醒,在下面第5步中讲解等待的线程 ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
(3)queryList( )
调用nacos服务端远程接口获取对应的服务信息
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Mapparams = new HashMap (8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqApi(UtilAndComs.nacosUrlbase + "/instance/list", params, HttpMethod.GET); }
(4)processServiceJson( )
处理上一步的返回服务信息
public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); //空或者返回的服务信息异常时,返回旧的服务信息 if (pushEmptyProtection && !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } //将新的服务信息放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); //旧的服务主机Map Map5、服务正在更新中oldHostMap = new HashMap (oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } //新的服务主机Map Map newHostMap = new HashMap (serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } //修改了的主机 Set modHosts = new HashSet (); //新上线的主机 Set newHosts = new HashSet (); //下线的主机 Set remvHosts = new HashSet (); List > newServiceHosts = new ArrayList >( newHostMap.entrySet()); for (Map.Entry entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } //记录服务主机变化日志信息 if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { //发布服务变化事件 InstancesChangeEvent NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); //写入缓村文件,如果开启了loadCacheAtStart,会在系统启动时从缓村加载服务信息 DiskCache.write(serviceInfo, cacheDir); } } else { //该分支是旧的服务不存在时 changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); //服务信息存入内存缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); //发布InstancesChangeEvent事件 NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); //写入缓存文件 DiskCache.write(serviceInfo, cacheDir); } //prometheus 监控统计 MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; }
等待5s时间,在等待过程中,如果服务更新完成,会发出通知唤醒等待的线程
if (UPDATe_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } }6、定时更新服务
(1)scheduleUpdateIfAbsent( )
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { //缓存中服务是否存在 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { //双重检验缓存 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } //服务更新任务 UpdateTask ScheduledFuture> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } }
public synchronized ScheduledFuture> addTask(UpdateTask task) { //延时1s调度任务 DEFAULT_DELAY = 1000L return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
(2)UpdateTask
public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { //服务信息不存在,更新服务信息 updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { //服务信息更新时间不变,即跟上次相比服务没有更新 //更新服务信息 updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push //距离上次时间,服务已经更新过,只进行服务刷新,不处理拉取下来的新的服务 refreshOnly(serviceName, clusters); } //更新上次服务信息返回的时间字段 lastRefTime = serviceObj.getLastRefTime(); if (!notifier.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { //服务主机为空时,增加失败次数 incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); //重置失败次数 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { //失败次数参与计算下次服务更新任务延时时间,最大是60s,Math.min(delayTime << failCount, DEFAULT_DELAY * 60) executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)