微服务----2

微服务----2,第1张

微服务----2 MQ

MQ消息可靠性




队列持久化

默认生成的交换机,队列和消息都是持久化的

消费者消息确认

使用auto 目前的配置可能导致消息一直重试

失败重试机制



死信交换机


DelayExchange插件

惰性队列



MQ集群



面试篇

nacos分级存储模型

nacos注册

 @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
	// 尝试获取namespaceId
    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 尝试获取serviceName,其格式为 group_name@@service_name
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
	// 解析出实例信息,封装为Instance对象
    final Instance instance = parseInstance(request);
	// 注册实例
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
    }

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
    // 此时不包含实例信息
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
==>
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
            //尝试从注册表获取服务
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            // 记录服务最后一次更新时间
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            // 初始化空服务
            putServiceAndInit(service);
            ==》
				            private void putServiceAndInit(Service service) throws NacosException {
				        // 把服务放入注册表
				        putService(service);
				        service = getService(service.getNamespaceId(), service.getName());
				        // 初始化,健康检测
				        service.init();
				        // 服务状态变更监听
				        consistencyService
				                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
				        consistencyService
				                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
				        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
				    }
            ==》
            if (!local) {
                addOrReplaceService(service);
            }
        }

==>
    // 拿到创建好的service
    Service service = getService(namespaceId, serviceName);
    // 拿不到则抛异常
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // 添加要注册的实例到service中
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

== 》 


public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {
	// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 获取服务
    Service service = getService(namespaceId, serviceName);
    // 同步锁,避免并发修改的安全问题,同一个服务可能有多个实例,多个实例只能串行执行
    synchronized (service) {
        // 1)获取要更新的实例列表==》 拷贝注册表中旧的实例列表+新注册的实例 得到最终实例列表
        List instanceList = addIpAddresses(service, ephemeral, ips);
		// 2)封装实例列表到Instances对象
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
		// 3)完成 注册表更新 以及 Nacos集群的数据同步
        consistencyService.put(key, instances);
       // 这里会判断是临时实例还是永久实例,默认是临时的,永久节点使用强一致性
        ==》
	            @Override
	    public void put(String key, Record value) throws NacosException {
	    	// 更新本地注册表
	        onPut(key, value);
			==>
					 public void onPut(String key, Record value) {
		        // 判断是不是临时实例
		        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
		            // 封装实例列表到Datum
		            Datum datum = new Datum<>();
		            datum.value = (Instances) value;
		            // 唯一标识
		            datum.key = key;
		            datum.timestamp.incrementAndGet();
		            //serviceId为Key,datum为值 缓存起来
		            dataStore.put(key, datum);
		        }
		        
		        if (!listeners.containsKey(key)) {
		            return;
		        }
		        // 把serviceId和当前 *** 作类型存入notifer
		        notifier.addTask(key, DataOperation.CHANGE);
		        	==>
					        	  public void addTask(String datumKey, DataOperation action) {
			            
			            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
			                return;
			            }
			            if (action == DataOperation.CHANGE) {
			                services.put(datumKey, StringUtils.EMPTY);
			            }
             // private BlockingQueue> tasks = new ArrayBlockingQueue<>(1024 * 1024);
             			//把serviceId和事件  放入阻塞队列
			            tasks.offer(Pair.with(datumKey, action));

						==》
						        @Override
						        public void run() {
						            Loggers.DISTRO.info("distro notifier started");
						            
						            for (; ; ) {
						                try {
						                //队列放一个任务,我才更新列表
						                    Pair pair = tasks.take();
						                    handle(pair);
						                } catch (Throwable e) {
						                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
						                }
						            }
        } 
						==》
			        }
		        	==>
		    }
			==>

	        // If upgrade to 2.0.X, do not sync for v1.
	        if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
	            return;
	        }
	        // 异步,同步给nacos其他节点
	        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
	                DistroConfig.getInstance().getSyncDelayMillis());
	                ==》
	                // 通知其他节点
	                        for (Member each : memberManager.allMembersWithoutSelf()) {
			            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
			                targetServer);
			        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
			        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
			        if (Loggers.DISTRO.isDebugEnabled()) {
			            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
			        }
        }
	                ==>
	    }
        ==》
    }
}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5702148.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存