spring cloud alibaba 学习(五)HostReactor 服务更新流程

spring cloud alibaba 学习(五)HostReactor 服务更新流程,第1张

spring cloud alibaba 学习(五)HostReactor 服务更新流程

目录

1、获取服务信息2、故障转移3、从缓存中获取服务信息4、从远程更新服务信息5、服务正在更新中6、定时更新服务

1、获取服务信息

HostReactor 中的 getServiceInfo( ) 方法

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        //开启故障转移模式,从failoverReactor 获取服务列表
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        
        //从缓存中获取服务列表
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
        if (null == serviceObj) {
        	//服务信息不存在时,创建新的服务信息,并放入缓存serviceInfoMap
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            //记录服务正在更新中
            updatingMap.put(serviceName, new Object());
            //更新服务
            updateServiceNow(serviceName, clusters);
            //服务更新完删除缓存
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) {
            //服务正在更新中,wait 5s  UPDATE_HOLD_INTERVAL = 5000L
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        
        //服务更新任务不存在时,创建服务更新任务并调度
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }
2、故障转移
	//failover-mode参数是由SwitchRefresher来处理的
	public boolean isFailoverSwitch() {
        return Boolean.parseBoolean(switchParams.get("failover-mode"));
    }
    
    //serviceMap是由FailoverFileReader来处理的
    public ServiceInfo getService(String key) {
        ServiceInfo serviceInfo = serviceMap.get(key);
        
        if (serviceInfo == null) {
            serviceInfo = new ServiceInfo();
            serviceInfo.setName(key);
        }
        
        return serviceInfo;
    }
3、从缓存中获取服务信息
 private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
        
        String key = ServiceInfo.getKey(serviceName, clusters);
        //从 serviceInfoMap 中获取服务信息
        return serviceInfoMap.get(key);
    }
4、从远程更新服务信息

当缓存中不存在服务信息时,更新服务

(1)updateServiceNow( )

private void updateServiceNow(String serviceName, String clusters) {
        try {
            updateService(serviceName, clusters);
        } catch (NacosException e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }

(2)updateService( )

 public void updateService(String serviceName, String clusters) throws NacosException {
 		//获取远程服务列表前,再次查询缓存中是否存在服务信息,如果存在,在finally里将等待服务列表的线程唤醒,在下面第5步中讲解等待的线程
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            
            if (StringUtils.isNotEmpty(result)) {
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

(3)queryList( )

调用nacos服务端远程接口获取对应的服务信息

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
            throws NacosException {
        
        final Map params = new HashMap(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        
        return reqApi(UtilAndComs.nacosUrlbase + "/instance/list", params, HttpMethod.GET);
    }

(4)processServiceJson( )

处理上一步的返回服务信息

 public ServiceInfo processServiceJson(String json) {
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        
        //空或者返回的服务信息异常时,返回旧的服务信息
        if (pushEmptyProtection && !serviceInfo.validate()) {
            //empty or error push, just ignore
            return oldService;
        }
        
        boolean changed = false;
        
        if (oldService != null) {
            
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                        + serviceInfo.getLastRefTime());
            }
            
            //将新的服务信息放入缓存
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            
            //旧的服务主机Map
            Map oldHostMap = new HashMap(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
                oldHostMap.put(host.toInetAddr(), host);
            }
            
            //新的服务主机Map
            Map newHostMap = new HashMap(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
                newHostMap.put(host.toInetAddr(), host);
            }
            
            //修改了的主机
            Set modHosts = new HashSet();
            //新上线的主机
            Set newHosts = new HashSet();
            //下线的主机
            Set remvHosts = new HashSet();
            
            List> newServiceHosts = new ArrayList>(
                    newHostMap.entrySet());
            for (Map.Entry entry : newServiceHosts) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (oldHostMap.containsKey(key) && !StringUtils
                        .equals(host.toString(), oldHostMap.get(key).toString())) {
                    modHosts.add(host);
                    continue;
                }
                
                if (!oldHostMap.containsKey(key)) {
                    newHosts.add(host);
                }
            }
            
            for (Map.Entry entry : oldHostMap.entrySet()) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
                    continue;
                }
                
                if (!newHostMap.containsKey(key)) {
                    remvHosts.add(host);
                }
                
            }
            
            //记录服务主机变化日志信息
            if (newHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(newHosts));
            }
            
            if (remvHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(remvHosts));
            }
            
            if (modHosts.size() > 0) {
                changed = true;
                updateBeatInfo(modHosts);
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(modHosts));
            }
            
            serviceInfo.setJsonFromServer(json);
            
            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            	//发布服务变化事件 InstancesChangeEvent
                NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                        serviceInfo.getClusters(), serviceInfo.getHosts()));
                //写入缓村文件,如果开启了loadCacheAtStart,会在系统启动时从缓村加载服务信息
                DiskCache.write(serviceInfo, cacheDir);
            }
            
        } else {
        	//该分支是旧的服务不存在时
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
            //服务信息存入内存缓存
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            //发布InstancesChangeEvent事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            //写入缓存文件
            DiskCache.write(serviceInfo, cacheDir);
        }
        
        //prometheus 监控统计
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        
        if (changed) {
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
        }
        
        return serviceInfo;
    }
5、服务正在更新中

等待5s时间,在等待过程中,如果服务更新完成,会发出通知唤醒等待的线程

 if (UPDATe_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
6、定时更新服务

(1)scheduleUpdateIfAbsent( )

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
		//缓存中服务是否存在
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        
        synchronized (futureMap) {
        	//双重检验缓存
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            
            //服务更新任务 UpdateTask
            ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }
 public synchronized ScheduledFuture addTask(UpdateTask task) {
 		//延时1s调度任务  DEFAULT_DELAY = 1000L
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }

(2)UpdateTask

public void run() {
            long delayTime = DEFAULT_DELAY;
            
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                
                if (serviceObj == null) {
                	//服务信息不存在,更新服务信息
                    updateService(serviceName, clusters);
                    return;
                }
                
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                	//服务信息更新时间不变,即跟上次相比服务没有更新
                	//更新服务信息
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    //距离上次时间,服务已经更新过,只进行服务刷新,不处理拉取下来的新的服务
                    refreshOnly(serviceName, clusters);
                }
                
                //更新上次服务信息返回的时间字段
                lastRefTime = serviceObj.getLastRefTime();
                
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                	//服务主机为空时,增加失败次数
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                //重置失败次数
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
            	//失败次数参与计算下次服务更新任务延时时间,最大是60s,Math.min(delayTime << failCount, DEFAULT_DELAY * 60)
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5722233.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存