11.nacos服务注册源码分析之提供者服务注册

11.nacos服务注册源码分析之提供者服务注册,第1张

11.nacos服务注册源码分析之提供者服务注册 11.nacos服务注册源码分析之提供者服务注册 源码环境说明:

springcloud 2020.0.4
springcloud alibaba 2021.1
引用的 nacos 版本:1.4.1
源码分析项目代码地址

主要内容

当前博客主要为 nacos-client 端的代码分析,主要介绍客户端如何与服务端通讯,包括:

    nacos-client 如何将当前微服务注册到 nacos-servernacos-client 如何维护与 nacos-server 的心跳nacos-client 每 10s 轮训拉取服务实例列表nacos-client 如何接收 nacos-server 的服务变更推送(udp方式)
服务提供者服务注册源码分析

nacos注册中心整合springcloud项目pom如下。nacos与springcloud的整合,主要由以下 starter 中实现

   
       com.alibaba.cloud
       spring-cloud-starter-alibaba-nacos-discovery
   

当前starter模块的类结构如下:

主要的装配类说明

配置类 NacosDiscoveryAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {
	// 装配 nacos 注册中心属性
	@Bean
	@ConditionalOnMissingBean	
	public NacosDiscoveryProperties nacosProperties() {
		return new NacosDiscoveryProperties();
	}
	// NacosServiceDiscovery 封装了与注册中心交互获取服务实例的方法
	// 如:List getInstances(String serviceId)
	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}
}

配置类NacosDiscoveryEndpointAutoConfiguration 用于暴露服务健康状态、健康指标等数据。
配置类NacosServiceRegistryAutoConfiguration,装配服务注册相关功能(重点)

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
	// 服务注册类:当前微服务信息,由这个类注册到nacos服务注册中心中
	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}
	// 服务注册信息类
	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}
	// 服务自动注册类,由这个类调用 NacosServiceRegistry 中的注册
	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}
}
服务注册过程分析

由 WebServerInitializedEvent 事件触发服务注册

// AbstractAutoServiceRegistration: springcloud 提供的服务自动注册顶层封装类
public abstract class AbstractAutoServiceRegistration
		implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener {
	// 收到服务初始化完成的事件回调
	@Override
	@SuppressWarnings("deprecation")
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}
	@Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}
	public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}
		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
			// ## 发布服务预注册事件
			this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
			// ## 执行注册(这里由 NacosServiceRegistry 实现)
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			// 发布服务注册完成事件
			this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}
}

nacos客户端进行服务注册 NacosServiceRegistry

public class NacosServiceRegistry implements ServiceRegistry {
	@Override
	public void register(Registration registration) {
		if (StringUtils.isEmpty(registration.getServiceId())) {
			log.warn("No service to register for nacos client...");
			return;
		}
	    // ## 获取注册服务
		NamingService namingService = namingService();
		// ## 服务名称 ${spring.application.name}
		String serviceId = registration.getServiceId();
		// ## 服务分组,默认 DEFAULT_GROUP
		String group = nacosDiscoveryProperties.getGroup();
		// ## 构建服务信息类
		Instance instance = getNacosInstanceFromRegistration(registration);

		try {
			// ### 注册服务执行注册(见下 NacosNamingService )
			namingService.registerInstance(serviceId, group, instance);
			log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
					instance.getIp(), instance.getPort());
		}
		catch (Exception e) {
			log.error("nacos registry, {} register failed...{},", serviceId,
					registration.toString(), e);
			// rethrow a RuntimeException if the registration is failed.
			// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
			rethrowRuntimeException(e);
		}
	}
}

注册中心服务类:NamingService

public class NacosNamingService implements NamingService {
 @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // ## 临时节点,添加心跳任务(见下方BeatReactor)
        if (instance.isEphemeral()) { 
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        // 执行注册
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
}

实际与注册中心交互的类:NamingProxy

public class NamingProxy implements Closeable {
	public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {        
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);
        // 构造当前服务节点的参数信息
        final Map params = new HashMap(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId); // namespace
        params.put(CommonParams.SERVICE_NAME, serviceName); // serviceName
        params.put(CommonParams.GROUP_NAME, groupName); // 分组信息
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); // 集群信息,默认 DEFAULT
        params.put("ip", instance.getIp()); // 当前节点ip
        params.put("port", String.valueOf(instance.getPort())); // 当前节点端口
        params.put("weight", String.valueOf(instance.getWeight())); //  当前节点权重
        params.put("enable", String.valueOf(instance.isEnabled())); 
        params.put("healthy", String.valueOf(instance.isHealthy())); //  当前节点健康状态
        params.put("ephemeral", String.valueOf(instance.isEphemeral())); //  是否临时节点
        params.put("metadata", JacksonUtils.toJson(instance.getmetadata())); // 元数据
        
        // 执行请求
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);        
    }
	
	// 发起请求
	public String reqApi(String api, Map params, Map body, List servers,
            String method) throws NacosException {
        
        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());        
        if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }
        
        NacosException exception = new NacosException();        
        if (StringUtils.isNotBlank(nacosDomain)) {
            for (int i = 0; i < maxRetry; i++) { // 重试机制
                try {
                	 #### http调用nacos服务端进行服务注册
                    return callServer(api, params, body, nacosDomain, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
                        NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                    }
                }
            }
        }  
        NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
                exception.getErrMsg());        
        throw new NacosException(exception.getErrCode(),
                "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());        
    }
	
	public String callServer(String api, Map params, Map body, String curServer,
            String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
        injectSecurityInfo(params);
        Header header = builderHeader();        
        String url;
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
            url = curServer + api;
        } else {
            if (!IPUtil.containsPort(curServer)) {
                curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
            }
            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        }        
        try {
        	// ## 使用 restTemplate 方式请求 nacos服务端 进行服务注册
            HttpRestResult restResult = nacosRestTemplate
                    .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
            end = System.currentTimeMillis();            
            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                    .observe(end - start);            
            if (restResult.ok()) {
                return restResult.getData();
            }
            if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
                return StringUtils.EMPTY;
            }
            throw new NacosException(restResult.getCode(), restResult.getMessage());
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to request", e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }
    }
}

callServer 的参数信息

到这里,服务注册就完成了,可以在nacos注册中心的服务管理下面,看到注册的服务信息。

nacos客户端心跳机制 BeatReactor

NacosNamingService 初始了 BeatReactor

public class NacosNamingService implements NamingService {    
    private HostReactor hostReactor; // 主机反应堆:定时拉取服务列表,维护upd推送服务
    private BeatReactor beatReactor; // 心跳反应堆:向服务端上报心跳
    private NamingProxy serverProxy; // 注册中心服务代理,与注册服务端通讯
        
    private void init(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        this.namespace = InitUtils.initNamespaceForNaming(properties);
        InitUtils.initSerialization();
        initServerAddr(properties);
        InitUtils.initWebRootContext(properties);
        initCacheDir();
        initLogName(properties);
        
        this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
        this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
        this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, 
        		isLoadCacheAtStart(properties),
                isPushEmptyProtect(properties), 
                initPollingThreadCount(properties));
    }		
}

BeatReactor 代码分析

public class BeatReactor implements Closeable {
	// 构建函数
    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.serverProxy = serverProxy;
        // 创建了一个调度线程池
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }
    // 由 NacosNamingService 在服务注册时调用
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        // 开始一个心跳任务
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }
	
	// 心跳服务类
    class BeatTask implements Runnable {        
        BeatInfo beatInfo;        
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }        
        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
            	// 发送心跳
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                // 发送心跳时,服务尚未注册,则注册服务
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setmetadata(beatInfo.getmetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());                
            }
            // 心跳处理完成后,再添加一个心跳任务
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }
}

心跳发送的数据

定时拉取服务信息 HostReactor
public class HostReactor implements Closeable {
	
	// 本地服务缓存
    private final Map serviceInfoMap;
    
    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart,
            boolean pushEmptyProtection, int pollingThreadCount) {
        // init executorService
        // # 任务调度线程池
        this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                return thread;
            }
        });
        // # 心跳响应堆
        this.beatReactor = beatReactor;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap(16);
        }
        this.pushEmptyProtection = pushEmptyProtection;
        this.updatingMap = new ConcurrentHashMap();
        // 处理错误信息的:本地文件缓存失败处理
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        // ## ** 初始化UDP接收服务
        this.pushReceiver = new PushReceiver(this);
        this.notifier = new InstancesChangeNotifier();
        
        NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
        NotifyCenter.registerSubscriber(notifier);
    }
	// 按服务名称更新服务列表
	public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            // 到远程服务端查询服务列表
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            // 处理获取到的服务列表,并保存到 serviceInfoMap 缓存中
            if (StringUtils.isNotEmpty(result)) {
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    // 定时更新服务
    public class UpdateTask implements Runnable {        
        long lastRefTime = Long.MAX_VALUE;        
        private final String clusters;        
        private final String serviceName;        
        
        private int failCount = 0;
        
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }        
        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }        
        private void resetFailCount() {
            failCount = 0;
        }        
        @Override
        public void run() {
        	// 默认10s
            long delayTime = DEFAULT_DELAY;            
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); 
                if (serviceObj == null) { // 本地缓存没有,从服务端拉取,直接返回
                    updateService(serviceName, clusters);
                    return;
                }                
                // 未接收到服务端推送,刷新服务列表信息
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    // 如果期间接收到了服务端的推送,就不需要再更新了
                    refreshOnly(serviceName, clusters);
                }                
                lastRefTime = serviceObj.getLastRefTime();                
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }                
                delayTime = serviceObj.getCacheMillis(); // 10s
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
            	// 再添加一个定时任务,去拉取服务列表信息。
            	// 衰减重试机制
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
}

刷新服务列表时,会向服务端发送 upd推送相关信息:udp端口

接收服务推送 PushReceiver Udp服务端
public class PushReceiver implements Runnable, Closeable {
    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            // 会随机绑定一个可用的端口,作为udp的推送端口
            this.udpSocket = new DatagramSocket();
            // 一个线程的线程池
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            // 启动当前线程,执行run方法
            this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }    
    @Override
    public void run() {
        while (!closed) {
            try {                
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                // udp方式接收服务发送过来的数据
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                // 如果dom变更,或者服务变更
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    // 更新服务数据
                    hostReactor.processServiceJson(pushPacket.data);
                    
                    // send ack to server
                    ack = "{"type": "push-ack"" + ", "lastRefTime":"" + pushPacket.lastRefTime + "", "data":"
                            + """}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{"type": "dump-ack"" + ", "lastRefTime": "" + pushPacket.lastRefTime + "", "data":"
                            + """ + StringUtils.escapeJavascript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                            + ""}";
                } else {
                    // do nothing send ack only
                    ack = "{"type": "unknown-ack"" + ", "lastRefTime":"" + pushPacket.lastRefTime
                            + "", "data":" + """}";
                }
                // 发送响应
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }
    
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706850.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存