cloud Alibaba Nacos配置中心源码(3) 长轮询设计

cloud Alibaba Nacos配置中心源码(3) 长轮询设计,第1张

     

目录

1、多线程任务获取配置并对比

1.1 checkConfigInfo

1.2 LongPollingRunnable.run

1.2.1 checkLocalConfig

1.2.2 Check From Server

1.3 cacheData 变化通知

1.3.1 safeNotifyListener

2、长轮询


        我们前面说过了,配置中心的变化是由客户端发起http连接请求server去获取配置并和本地进行对比后更新本地文件以及class属性。下面从两方面说下nacos config的客户端是如何去获取配置和设计原理。

1、多线程任务获取配置并对比

        面试的时候肯定会有人问:nacos client和server 配置变动是通过 拉还是推 获取?先说答案: 是拉。

        前面说过当加载nacos ,会构建clientWorker,clientWorker内部就是定义了线程池运行线程进行对比配置文件。

        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
         //线程池只有1个核心线程,每10毫秒运行任务
         this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
        
        //定义线程池,核心线程为电脑核心数。用于在checkConfigInfo中运行长轮询任务,使用两个线程    
         //池,最大化的将配置文件对比任务时间拉短。
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
1.1 checkConfigInfo
public void checkConfigInfo() {
        
        int listenerSize = cacheMap.size();
        // 将当前监控的配置分批次任务运行,每批最多3000,并向上取整
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        //currentLongingTaskCount 记录 当前运行的任务数,也就是不会重复去运行里面的方法重复创建任务,只有当cacheMap数量增加,且超过了上次的/3000的数后,才会在创建LongPollingRunnable执行
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // The task list is no order.So it maybe has issues when changing.
                //我理解:因为监控的配置并没有顺序,所以在做配置文件时要注意,不要存在重复的key
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

        cacheMap是线程安全的ConcurrentHashMap()。不会受多线程 *** 作而导致混乱。

       这里的意思其实就是当我配置文件小于3000个时,就指挥启动一个 longPollingRunnable线程任务,而executorService.execute 是只运行一次的,而当运行一次任务完成后会使用executorService再次运行,这块代码在longPollingRunnable 最后处。

1.2 LongPollingRunnable.run

        分成三步

1.2.1 checkLocalConfig
                //轮询cacheMap,挑选出符合当前taskId运行的cacheData数据
                //cacheData的taskId是在com.alibaba.nacos.client.config.impl.ClientWorker#addCacheDataIfAbsent(java.lang.String, java.lang.String)设置的。
                for (CacheData cacheData : cacheMap.values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            //判断本地文件中的内容是否有变化,有变化将发起刷新 *** 作
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }
1.2.2 Check From Server
                //nacos server 对外提供 /v1/cs/configs/listener api监听哪些配置发送了变化
                List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
                }
                //遍历发生变化的key
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        //通过http方法调用 v1/cs/configs 获取内容,并将内容保存本地文件系统
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(ct[0]);
                        if (null != ct[1]) {
                            cache.setType(ct[1]);
                        }
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                agent.getName(), dataId, group, tenant, cache.getMd5(),
                                ContentUtils.truncateContent(ct[0]), ct[1]);
                    } catch (NacosException ioe) {
                        String message = String
                                .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                        agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                            .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        //内部将对比当前md5和上一次内容的md5 如果不同将发起变化通知
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
                //这里就是之前说的 当本次执行任务完成后,会继续仍到线程池运行
                executorService.execute(this);
1.3 cacheData 变化通知

        上面说过,当有本地文件的配置变化或者从server获取最新配置后都会执行checkListenerMd5方法。

void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, wrap);
            }
        }
    }

        1)listeners是对象属性,他的设置是在com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener方法内。

        2)对象有一个 lastCallMd5属性,这个属性是初始化时会进行设置,且当有变化后会重置

1.3.1 safeNotifyListener

        这里也是异步执行的

private void safeNotifyListener(final String dataId, final String group, final String     
    content, final String type,
            final String md5, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;
        
        Runnable job = new Runnable() {
            @Override
            public void run() {
                ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                ClassLoader appClassLoader = listener.getClass().getClassLoader();
                try {
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener) listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
                    // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                    Thread.currentThread().setContextClassLoader(appClassLoader);
                    
                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    //执行过滤器链
                    configFilterChainManager.doFilter(null, cr);
                    String contentTmp = cr.getContent();
                    //这里将执行注册listener时的回调方法,会publish RefreshEvent
                    listener.receiveConfigInfo(contentTmp);
                    
                    // compare lastContent and content
                    if (listener instanceof AbstractConfigChangeListener) {
                        Map data = ConfigChangeHandler.getInstance()
                                .parseChangeData(listenerWrap.lastContent, content, type);
                        ConfigChangeEvent event = new ConfigChangeEvent(data);
                        ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                        listenerWrap.lastContent = content;
                    }
                    
                    listenerWrap.lastCallMd5 = md5;
                    LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                            listener);
                } catch (NacosException ex) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                            name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
                } catch (Throwable t) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                            group, md5, listener, t.getCause());
                } finally {
                    Thread.currentThread().setContextClassLoader(myClassLoader);
                }
            }
        };
        
        final long startNotify = System.currentTimeMillis();
        try {
            if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                job.run();
            }
        } catch (Throwable t) {
            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                    group, md5, listener, t.getCause());
        }

 

 总结:

        1)clientWorker创建两个线程池,一个线程池10毫秒运行一此checkConfigInfo方法,其中内部会按照批次执行配置文件的对比,当配置文件数量不变化时,不会重复创建长轮询对比配置文件任务。另外一个线程池负责 在checkConfigInfo方法内执行长轮询任务,对比配置文件变化。

        2)LongPollingRunnable 内部 先执行内存内的cacheData(配置文件本地缓存)和本地文件系统对比,如果有变化将启动监听并执行回调。接下来会发起http调用nacos server api查询是否有配置文件变化,如果有变化也会将启动监听并执行回调。

2、长轮询

        nacos使用长轮询来尽量缩小服务端配置变化client端得到通知的时间。长轮询是servlet3.0的特性,异步化处理请求        

        其中 /v1/cs/configs/listener 接口实现长轮询请求。

         异步原理:

        

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/992637.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存