Nacos源码分析05-客户端本地缓存与故障转移

Nacos源码分析05-客户端本地缓存与故障转移,第1张

Nacos源码分析05-客户端本地缓存故障转移

文章目录
  • Nacos源码分析05-客户端本地缓存与故障转移
    • 1. ServiceInfoHolder
      • 1.1. 构造器
      • 1.2. 本地缓存的写入
      • 1.3. 本地缓存目录
    • 2. 故障转移
      • 2.1. FailoverReactor
      • 2.2. SwitchRefresher
      • 2.3. DiskFileWriter
      • 2.4. FailoverFileReader
      • 2.5. ServiceInfoHolder.getServiceInfo

Nacos源码分析05-客户端本地缓存与故障转移

本系列博客,采用官方源码版本为2.0.3

  在Nacos本地缓存的时候有的时候必然会出现一些故障,这些故障就需要进行处理,涉及到的核心类为ServiceInfoHolder和FailoverReactor。

  本地缓存有两方面,第一方面是从注册中心获得实例信息会缓存在内存当中,也就是通过Map的形式承载,这样查询 *** 作都方便。第二方面便是通过磁盘文件的形式定时缓存起来,以备不时之需。

  故障转移也分两方面,第一方面是故障转移的开关是通过文件来标记的;第二方面是当开启故障转移之后,当发生故障时,可以从故障转移备份的文件中来获得服务实例信息。

1. ServiceInfoHolder

  ServiceInfoHolder类 —— 服务信息的持有者,每次客户端从注册中心获取新的服务信息时都会调用该类的processServiceInfo方法来进行处理。ServiceInfoHolder类内部做了很多工作,维护内存缓存、发布变更事件、写入文件缓存、故障转移初始化。

1.1. 构造器

  观察ServiceInfoHolder构造器初始化过程,就能看出ServiceInfoHolder的核心功能。

  ServiceInfoHolder类通过一个ConcurrentMap类型的serviceInfoMap属性来储存ServiceInfo数据。这就是Nacos客户端对服务端获取到的注册信息的第一层缓存,并且之前的课程中我们分析processServiceInfo方法时,我们已经看到,当服务信息变更时会第一时间更新ServiceInfoMap中的信息

public class ServiceInfoHolder implements Closeable {

    private final ConcurrentMap serviceInfoMap;
    
    private final FailoverReactor failoverReactor;
    
    private final boolean pushEmptyProtection;
    
    private String cacheDir;
 
    public ServiceInfoHolder(String namespace, Properties properties) {
        // 初始化本地缓存文件夹
        initCacheDir(namespace, properties);
        
        // 判断是否支持本地缓存
        if (isLoadCacheAtStart(properties)) {
            // 从磁盘中读取本地缓存,初始化内存缓存
            this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap(16);
        }
        
        // 初始化故障反应器
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        
        this.pushEmptyProtection = isPushEmptyProtect(properties);
    }
    
}
1.2. 本地缓存的写入

  在processServiceInfo方法中,当服务实例变更时会看到通过DiskCache#write方法向该目录写入ServiceInfo信息。

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    
    ......
    
    if (changed) {
        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                + JacksonUtils.toJson(serviceInfo.getHosts()));
        NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}
1.3. 本地缓存目录

  本地缓存目录cacheDir是ServiceInfoHolder的一个属性,用于指定本地缓存的根目录和故障转移的根目录。在ServiceInfoHolder的构造方法中会初始化生成缓存目录。

  initCacheDir方法就是生成缓存目录的 *** 作,默认路径:${user.home}/nacos/naming/public,也可以自定义,通过System.setProperty(“JM.SNAPSHOT.PATH”)自定义。初始化完目录之后,故障转移信息也存储在该目录下。

private void initCacheDir(String namespace, Properties properties) {
    String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);

    String namingCacheRegistryDir = "";
    if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
        namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
    }

    if (!StringUtils.isBlank(jmSnapshotPath)) {
        cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
            + File.separator + FILE_PATH_NAMING + File.separator + namespace;
    } else {
        cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
            + File.separator + FILE_PATH_NAMING + File.separator + namespace;
    }
}
2. 故障转移 2.1. FailoverReactor

  在ServiceInfoHolder的构造方法中,还会初始化一个FailoverReactor类,同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。

  FailoverReactor的构造方法基本展示出了它的功能:

  1. 持有ServiceInfoHolder的引用;
  2. 拼接故障目录:${user.home}/nacos/naming/[命名空间]/failover;
  3. 初始化executorServicek,并调用init方法通过executorService开启多个定时任务执行。
public class FailoverReactor implements Closeable {
    
    private static final String FAILOVER_DIR = "/failover";
    
    private static final String IS_FAILOVER_MODE = "1";
    
    private static final String NO_FAILOVER_MODE = "0";
    
    private static final String FAILOVER_MODE_PARAM = "failover-mode";
    
    private Map serviceMap = new ConcurrentHashMap();
    
    private final Map switchParams = new ConcurrentHashMap();
    
    private static final long DAY_PERIOD_MINUTES = 24 * 60;
    
    private final String failoverDir;
    
    private final ServiceInfoHolder serviceInfoHolder;
    
    private final ScheduledExecutorService executorService;
    
    public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
        // 持有ServiceInfoHolder的引用
        this.serviceInfoHolder = serviceInfoHolder;
        // 拼接故障目录:${user.home}/nacos/naming/public/failover
        this.failoverDir = cacheDir + FAILOVER_DIR;
        // 初始化executorService
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                // 守护线程模式运行
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.failover");
                return thread;
            }
        });
        // 其他初始化 *** 作,通过executorService开启多个定时任务执行
        this.init();
    }
}

  FailoverReactor的init方法中开启了三个定时任务,这三个任务都是FailoverReactor的内部类。

  • 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher;
  • 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter;
  • 初始化立即执行,执行间隔10秒,执行核心 *** 作为DiskFileWriter。
public void init() {
	// 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
	// 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);

    // backup file on startup if failover directory is empty.
    // 如果故障目录为空,启动时立即执行,立即备份文件
    // 初始化立即执行,执行间隔10秒,执行核心 *** 作为DiskFileWriter
    executorService.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                File cacheDir = new File(failoverDir);

                if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                    throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                }

                File[] files = cacheDir.listFiles();
                if (files == null || files.length <= 0) {
                    new DiskFileWriter().run();
                }
            } catch (Throwable e) {
                NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
            }

        }
    }, 10000L, TimeUnit.MILLISECONDS);
}
2.2. SwitchRefresher

  SwitchRefresher的核心逻辑如下:

  1. 如果故障转移文件不存在,则直接返回(文件开关)
  2. 比较文件修改时间,如果已经被修改过,则获取故障转移文件中的内容。
  3. 故障转移文件中存储了0和1标识。0表示关闭,1表示开启。
  4. 当为开启状态时,执行线程FailoverFileReader。
class SwitchRefresher implements Runnable {

    long lastModifiedMillis = 0L;

    @Override
    public void run() {
        try {
            File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
            // 文件不存在则退出
            if (!switchFile.exists()) {
                //故障转移模式设置为关闭
                switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
                return;
            }

            long modified = switchFile.lastModified();
			
            if (lastModifiedMillis < modified) {
                lastModifiedMillis = modified;
                // 获取故障转移文件内容
                String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                                                                    Charset.defaultCharset().toString());
                if (!StringUtils.isEmpty(failover)) {
                    String[] lines = failover.split(DiskCache.getLineSeparator());

                    for (String line : lines) {
                        String line1 = line.trim();
                        // 1 表示开启故障转移模式
                        if (IS_FAILOVER_MODE.equals(line1)) {
                            //故障转移模式设置为开启
                            switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
                            NAMING_LOGGER.info("failover-mode is on");
                            new FailoverFileReader().run();
                        // 0 表示关闭故障转移模式
                        } else if (NO_FAILOVER_MODE.equals(line1)) {
                            //故障转移模式设置为关闭
                            switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                            NAMING_LOGGER.info("failover-mode is off");
                        }
                    }
                } else {
                    //故障转移模式设置为关闭
                    switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                }
            }

        } catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
        }
    }
}
2.3. DiskFileWriter

  DiskFileWriter的工作就是获取serviceInfoHolder中的ServiceInfo消息, 判断是否满足写入磁盘,如果条件满足,就将其写入拼接的故障目录。后两个定时任务执行的都是DiskFileWriter,但第三个定时任务是有前置判断的,只要failoverDir文件夹不存在文件,就会立即执行DiskFileWriter。

class DiskFileWriter extends TimerTask {

    @Override
    public void run() {
        Map map = serviceInfoHolder.getServiceInfoMap();
        for (Map.Entry entry : map.entrySet()) {
            ServiceInfo serviceInfo = entry.getValue();
            if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
                continue;
            }
			// 将缓存写入磁盘
            DiskCache.write(serviceInfo, failoverDir);
        }
    }
}
2.4. FailoverFileReader

  FailoverFileReader _ 故障转移文件读取,基就是读取failover目录存储的备份服务信息文件,然后转换成ServiceInfo,并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。

  1. 读取failover目录下的所有文件,进行遍历处理
  2. 如果文件不存在跳过
  3. 如果文件是故障转移开关标志文件跳过
  4. 读取文件中的备份内容,转换为ServiceInfo对象
  5. 将ServiceInfo对象放入到domMap中
  6. 最后判断domMap不为空,赋值给serviceMap
class FailoverFileReader implements Runnable {

    @Override
    public void run() {
        Map domMap = new HashMap(16);

        BufferedReader reader = null;
        try {

            File cacheDir = new File(failoverDir);
            if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                throw new IllegalStateException("failed to create cache dir: " + failoverDir);
            }

            File[] files = cacheDir.listFiles();
            if (files == null) {
                return;
            }

            for (File file : files) {
                if (!file.isFile()) {
                    continue;
                }
				// 如果是故障转移标志文件,则跳过
                if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
                    continue;
                }

                ServiceInfo dom = new ServiceInfo(file.getName());

                try {
                    String dataString = ConcurrentDiskUtil
                        .getFileContent(file, Charset.defaultCharset().toString());
                    reader = new BufferedReader(new StringReader(dataString));

                    String json;
                    if ((json = reader.readLine()) != null) {
                        try {
                            dom = JacksonUtils.toObj(json, ServiceInfo.class);
                        } catch (Exception e) {
                            NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
                        }
                    }

                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
                } finally {
                    try {
                        if (reader != null) {
                            reader.close();
                        }
                    } catch (Exception e) {
                        //ignore
                    }
                }
                if (!CollectionUtils.isEmpty(dom.getHosts())) {
                    domMap.put(dom.getKey(), dom);
                }
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }
		
        // 读入缓存
        if (domMap.size() > 0) {
            serviceMap = domMap;
        }
    }
}
2.5. ServiceInfoHolder.getServiceInfo

  ServiceInfoHolder在获得getServiceInfo的时候,会先调用failoverReactor.getService方法,判断是否开启故障转移。如果发生故障转移便会从failoverReactor.serviceMap中读取ServiceInfo.

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    return serviceInfoMap.get(key);
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5686225.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存