主要内容springcloud 2020.0.4
springcloud alibaba 2021.1
引用的 nacos 版本:1.4.1
源码分析项目代码地址
当前博客主要为 nacos-client 端的代码分析,主要介绍客户端如何与服务端通讯,包括:
- nacos-client 如何将当前微服务注册到 nacos-servernacos-client 如何维护与 nacos-server 的心跳nacos-client 每 10s 轮训拉取服务实例列表nacos-client 如何接收 nacos-server 的服务变更推送(udp方式)
nacos注册中心整合springcloud项目pom如下。nacos与springcloud的整合,主要由以下 starter 中实现
com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery
当前starter模块的类结构如下:
配置类 NacosDiscoveryAutoConfiguration
@Configuration(proxyBeanMethods = false) @ConditionalOnDiscoveryEnabled @ConditionalOnNacosDiscoveryEnabled public class NacosDiscoveryAutoConfiguration { // 装配 nacos 注册中心属性 @Bean @ConditionalOnMissingBean public NacosDiscoveryProperties nacosProperties() { return new NacosDiscoveryProperties(); } // NacosServiceDiscovery 封装了与注册中心交互获取服务实例的方法 // 如:ListgetInstances(String serviceId) @Bean @ConditionalOnMissingBean public NacosServiceDiscovery nacosServiceDiscovery( NacosDiscoveryProperties discoveryProperties, NacosServiceManager nacosServiceManager) { return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager); } }
配置类NacosDiscoveryEndpointAutoConfiguration 用于暴露服务健康状态、健康指标等数据。
配置类NacosServiceRegistryAutoConfiguration,装配服务注册相关功能(重点)
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { // 服务注册类:当前微服务信息,由这个类注册到nacos服务注册中心中 @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } // 服务注册信息类 @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider服务注册过程分析> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } // 服务自动注册类,由这个类调用 NacosServiceRegistry 中的注册 @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
由 WebServerInitializedEvent 事件触发服务注册
// AbstractAutoServiceRegistration: springcloud 提供的服务自动注册顶层封装类 public abstract class AbstractAutoServiceRegistrationimplements AutoServiceRegistration, ApplicationContextAware, ApplicationListener { // 收到服务初始化完成的事件回调 @Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { // ## 发布服务预注册事件 this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration())); // ## 执行注册(这里由 NacosServiceRegistry 实现) register(); if (shouldRegisterManagement()) { registerManagement(); } // 发布服务注册完成事件 this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } }
nacos客户端进行服务注册 NacosServiceRegistry
public class NacosServiceRegistry implements ServiceRegistry{ @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } // ## 获取注册服务 NamingService namingService = namingService(); // ## 服务名称 ${spring.application.name} String serviceId = registration.getServiceId(); // ## 服务分组,默认 DEFAULT_GROUP String group = nacosDiscoveryProperties.getGroup(); // ## 构建服务信息类 Instance instance = getNacosInstanceFromRegistration(registration); try { // ### 注册服务执行注册(见下 NacosNamingService ) namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); } } }
注册中心服务类:NamingService
public class NacosNamingService implements NamingService { @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // ## 临时节点,添加心跳任务(见下方BeatReactor) if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } // 执行注册 serverProxy.registerService(groupedServiceName, groupName, instance); } }
实际与注册中心交互的类:NamingProxy
public class NamingProxy implements Closeable { public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); // 构造当前服务节点的参数信息 final Mapparams = new HashMap (16); params.put(CommonParams.NAMESPACE_ID, namespaceId); // namespace params.put(CommonParams.SERVICE_NAME, serviceName); // serviceName params.put(CommonParams.GROUP_NAME, groupName); // 分组信息 params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); // 集群信息,默认 DEFAULT params.put("ip", instance.getIp()); // 当前节点ip params.put("port", String.valueOf(instance.getPort())); // 当前节点端口 params.put("weight", String.valueOf(instance.getWeight())); // 当前节点权重 params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); // 当前节点健康状态 params.put("ephemeral", String.valueOf(instance.isEphemeral())); // 是否临时节点 params.put("metadata", JacksonUtils.toJson(instance.getmetadata())); // 元数据 // 执行请求 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } // 发起请求 public String reqApi(String api, Map params, Map body, List servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(NacosException.INVALID_PARAM, "no server available"); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i++) { // 重试机制 try { #### http调用nacos服务端进行服务注册 return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); } public String callServer(String api, Map params, Map body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0; injectSecurityInfo(params); Header header = builderHeader(); String url; if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!IPUtil.containsPort(curServer)) { curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { // ## 使用 restTemplate 方式请求 nacos服务端 进行服务注册 HttpRestResult restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())) .observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException(restResult.getCode(), restResult.getMessage()); } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to request", e); throw new NacosException(NacosException.SERVER_ERROR, e); } } }
callServer 的参数信息
到这里,服务注册就完成了,可以在nacos注册中心的服务管理下面,看到注册的服务信息。
NacosNamingService 初始了 BeatReactor
public class NacosNamingService implements NamingService { private HostReactor hostReactor; // 主机反应堆:定时拉取服务列表,维护upd推送服务 private BeatReactor beatReactor; // 心跳反应堆:向服务端上报心跳 private NamingProxy serverProxy; // 注册中心服务代理,与注册服务端通讯 private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); initServerAddr(properties); InitUtils.initWebRootContext(properties); initCacheDir(); initLogName(properties); this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), isPushEmptyProtect(properties), initPollingThreadCount(properties)); } }
BeatReactor 代码分析
public class BeatReactor implements Closeable { // 构建函数 public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; // 创建了一个调度线程池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } // 由 NacosNamingService 在服务注册时调用 public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); // 开始一个心跳任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } // 心跳服务类 class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { // 发送心跳 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } // 发送心跳时,服务尚未注册,则注册服务 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setmetadata(beatInfo.getmetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } // 心跳处理完成后,再添加一个心跳任务 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } }
心跳发送的数据
public class HostReactor implements Closeable { // 本地服务缓存 private final MapserviceInfoMap; public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) { // init executorService // # 任务调度线程池 this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.client.naming.updater"); return thread; } }); // # 心跳响应堆 this.beatReactor = beatReactor; this.serverProxy = serverProxy; this.cacheDir = cacheDir; if (loadCacheAtStart) { this.serviceInfoMap = new ConcurrentHashMap (DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap (16); } this.pushEmptyProtection = pushEmptyProtection; this.updatingMap = new ConcurrentHashMap (); // 处理错误信息的:本地文件缓存失败处理 this.failoverReactor = new FailoverReactor(this, cacheDir); // ## ** 初始化UDP接收服务 this.pushReceiver = new PushReceiver(this); this.notifier = new InstancesChangeNotifier(); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(notifier); } // 按服务名称更新服务列表 public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 到远程服务端查询服务列表 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); // 处理获取到的服务列表,并保存到 serviceInfoMap 缓存中 if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } // 定时更新服务 public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private final String clusters; private final String serviceName; private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (failCount == limit) { return; } failCount++; } private void resetFailCount() { failCount = 0; } @Override public void run() { // 默认10s long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { // 本地缓存没有,从服务端拉取,直接返回 updateService(serviceName, clusters); return; } // 未接收到服务端推送,刷新服务列表信息 if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push // 如果期间接收到了服务端的推送,就不需要再更新了 refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!notifier.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); // 10s resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { // 再添加一个定时任务,去拉取服务列表信息。 // 衰减重试机制 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } } } }
刷新服务列表时,会向服务端发送 upd推送相关信息:udp端口
public class PushReceiver implements Runnable, Closeable { public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; // 会随机绑定一个可用的端口,作为udp的推送端口 this.udpSocket = new DatagramSocket(); // 一个线程的线程池 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); // 启动当前线程,执行run方法 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); } } @Override public void run() { while (!closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // udp方式接收服务发送过来的数据 udpSocket.receive(packet); String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; // 如果dom变更,或者服务变更 if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { // 更新服务数据 hostReactor.processServiceJson(pushPacket.data); // send ack to server ack = "{"type": "push-ack"" + ", "lastRefTime":"" + pushPacket.lastRefTime + "", "data":" + """}"; } else if ("dump".equals(pushPacket.type)) { // dump data to server ack = "{"type": "dump-ack"" + ", "lastRefTime": "" + pushPacket.lastRefTime + "", "data":" + """ + StringUtils.escapeJavascript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + ""}"; } else { // do nothing send ack only ack = "{"type": "unknown-ack"" + ", "lastRefTime":"" + pushPacket.lastRefTime + "", "data":" + """}"; } // 发送响应 udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error("[NA] error while receiving push data", e); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)