Nacos 默认的CAP理论的AP,只能保证最终一致性,所以需要通过心跳机制来感知服务的上下线状态
核心API 客户端 1、客户端是在注册服务的时候向服务端发起心跳请求com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 判断是否为临时实例 if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }2、封装BeatInfo 对象
com.alibaba.nacos.client.naming.beat.BeatReactor#buildBeatInfo(java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(groupedServiceName); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); // 元数据 beatInfo.setmetadata(instance.getmetadata()); beatInfo.setScheduled(false); // 配置心跳间隔 默认为5s发起一次心跳 beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; }3、开启定时心跳任务
com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); // 开启定时心跳任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }4、每过五秒钟向服务端发送心跳
@Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { // 发送心跳请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setmetadata(beatInfo.getmetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } // 嵌套调用心跳任务 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }5、发起心跳请求
com.alibaba.nacos.client.naming.net.NamingProxy#sendBeat
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map服务端 1、处理客户端心跳请求,如果实例不存在会进行注册params = new HashMap (8); Map bodyMap = new HashMap (2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } // namespace params.put(CommonParams.NAMESPACE_ID, namespaceId); // 服务名称 params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); // 集群名称 params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlbase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
com.alibaba.nacos.naming.controllers.InstanceController#beat
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); // 获取实例 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果实例不存在会进行注册 if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setmetadata(clientBeat.getmetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // 处理客户端心跳请求 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsmetadata(PreservedmetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }2、开启一个定时任务,更新实例的最后心跳时间
com.alibaba.nacos.naming.core.Service#processClientBeat
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor#run
@Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); // 获取集群 Cluster cluster = service.getClusterMap().get(clusterName); // 获取集群下全部实例 Listinstances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 更新最后心跳时间 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); // 发布服务变动事件 getPushService().serviceChanged(service); } } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)