MQ消息可靠性
队列持久化
默认生成的交换机,队列和消息都是持久化的
消费者消息确认
使用auto 目前的配置可能导致消息一直重试
失败重试机制
死信交换机
DelayExchange插件
惰性队列
MQ集群
nacos分级存储模型
nacos注册
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { // 尝试获取namespaceId final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 尝试获取serviceName,其格式为 group_name@@service_name final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 解析出实例信息,封装为Instance对象 final Instance instance = parseInstance(request); // 注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表) // 此时不包含实例信息 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); ==> public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { //尝试从注册表获取服务 Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown // 记录服务最后一次更新时间 service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // 初始化空服务 putServiceAndInit(service); ==》 private void putServiceAndInit(Service service) throws NacosException { // 把服务放入注册表 putService(service); service = getService(service.getNamespaceId(), service.getName()); // 初始化,健康检测 service.init(); // 服务状态变更监听 consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); } ==》 if (!local) { addOrReplaceService(service); } } ==> // 拿到创建好的service Service service = getService(namespaceId, serviceName); // 拿不到则抛异常 if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 添加要注册的实例到service中 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } == 》 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 获取服务 Service service = getService(namespaceId, serviceName); // 同步锁,避免并发修改的安全问题,同一个服务可能有多个实例,多个实例只能串行执行 synchronized (service) { // 1)获取要更新的实例列表==》 拷贝注册表中旧的实例列表+新注册的实例 得到最终实例列表 ListinstanceList = addIpAddresses(service, ephemeral, ips); // 2)封装实例列表到Instances对象 Instances instances = new Instances(); instances.setInstanceList(instanceList); // 3)完成 注册表更新 以及 Nacos集群的数据同步 consistencyService.put(key, instances); // 这里会判断是临时实例还是永久实例,默认是临时的,永久节点使用强一致性 ==》 @Override public void put(String key, Record value) throws NacosException { // 更新本地注册表 onPut(key, value); ==> public void onPut(String key, Record value) { // 判断是不是临时实例 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { // 封装实例列表到Datum Datum datum = new Datum<>(); datum.value = (Instances) value; // 唯一标识 datum.key = key; datum.timestamp.incrementAndGet(); //serviceId为Key,datum为值 缓存起来 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } // 把serviceId和当前 *** 作类型存入notifer notifier.addTask(key, DataOperation.CHANGE); ==> public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } // private BlockingQueue > tasks = new ArrayBlockingQueue<>(1024 * 1024); //把serviceId和事件 放入阻塞队列 tasks.offer(Pair.with(datumKey, action)); ==》 @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { //队列放一个任务,我才更新列表 Pair pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } ==》 } ==> } ==> // If upgrade to 2.0.X, do not sync for v1. if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { return; } // 异步,同步给nacos其他节点 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, DistroConfig.getInstance().getSyncDelayMillis()); ==》 // 通知其他节点 for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer); } } ==> } ==》 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)