Nacos集群(一)节点启动初始化源码解析

Nacos集群(一)节点启动初始化源码解析,第1张

Nacos集群(一)节点启动初始化源码解析

Nacos可以选择单机模式运行和集群模式运行,在生产中基本都要使用集群模式去运行的,而对于集群模式来说,每一个Nacos节点都需要通过配置知道其余节点的地址,比如说三个节点的集群,每个节点都需要知道另外两个节点的地址,才能够形成一个集群,下面我们就来看下Nacos节点在启动时是怎么获取其余节点地址进行集群初始化的

Nacos集群节点管理器ServerMemberManager

com.alibaba.nacos.core.cluster.ServerMemberManager

在Nacos服务启动的时候ServerMemberManager这个类专门对集群节点进行管理的,这个类在init方法中就会对集群进行初始化 

protected void init() throws NacosException {
    Loggers.CORE.info("Nacos-related cluster resource initialization");
    // 得到当前nacos服务的端口号
    this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
    // 得到当前nacos服务的地址
    this.localAddress = InetUtils.getSelfIP() + ":" + port;
    // 解析地址得到当前nacos服务所对应的集群节点对象
    this.self = MemberUtil.singleParse(this.localAddress);
    // 给当前nacos服务设置一个版本号
    this.self.setExtendVal(MembermetaDataConstants.VERSION, VersionUtils.version);
    // 把自己放到serverList中
    serverList.put(self.getAddress(), self);

    // 该方法做了两件事
    // 1.注册了一个集群节点信息变更事件
    // 2.注册了订阅IPChangeEvent事件的事件订阅者
    registerClusterEvent();

    // 初始化节点地址寻址模式
    // 在这里就可以通过配置的节点地址去初始化整个nacos集群节点集合了
    initAndStartLookup();

    if (serverList.isEmpty()) {
        throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
    }
    
    Loggers.CORE.info("The cluster resource is initialized");
}

 com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent

private void registerClusterEvent() {
    // 注册一个集群节点信息变更事件
    NotifyCenter.registerToPublisher(MembersChangeEvent.class,
            EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
    
    // 注册一个事件订阅者,订阅的事件类型是IPChangeEvent
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(InetUtils.IPChangeEvent event) {
            String newAddress = event.getNewIP() + ":" + port;
            ServerMemberManager.this.localAddress = newAddress;
            EnvUtil.setLocalAddress(localAddress);
            
            Member self = ServerMemberManager.this.self;
            self.setIp(event.getNewIP());
            
            String oldAddress = event.getOldIP() + ":" + port;
            ServerMemberManager.this.serverList.remove(oldAddress);
            ServerMemberManager.this.serverList.put(newAddress, self);
            
            ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
            ServerMemberManager.this.memberAddressInfos.add(newAddress);
        }
        
        @Override
        public Class subscribeType() {
            return InetUtils.IPChangeEvent.class;
        }
    });
}

 这个方法会注册一个MembersChangeEvent事件,该事件是在集群节点发生变更的时候发布的,而对应的事件订阅者是ServerListManager,集群节点发生变更这一块后面的章节会详细讲,我们只需要知道在初始化阶段会注册这么一个事件就行了。同时该方法还会注册一个IPChangeEvent事件的事件订阅者,IPChangeEvent这个事件顾名思义就是当前节点IP发生变更之后发布的,该事件发布之后会被这个注册的订阅者所捕获,该订阅者做的事情也很简单,就是对集群节点集合中对应当前节点的ip进行更新就行了

com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup 

这一行代码就是初始化集群的关键

private void initAndStartLookup() throws NacosException {
    // 创建nacos集群节点寻址器
    this.lookup = LookupFactory.createLookUp(this);
    this.lookup.start();
}

 先是通过LookupFactory创建一个节点寻址器,然后调用start方法启动这个节点寻址器,那么什么是节点寻址器?我们知道Nacos配置集群节点地址的时候有两种方式,一种是直接读取本地配置文件配置好的节点地址,另一种是通过配置服务器获取配置的节点地址,两种方式对应两中不同方式的寻址器,所以这里就需要找到具体的寻址器

 

public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
    // 条件成立:当前nacos节点是集群模式
    if (!EnvUtil.getStandaloneMode()) {
        // 从配置环境中获取nacos集群节点的寻址方式
        String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
        LookupType type = chooseLookup(lookupType);

        // 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookup
        LOOK_UP = find(type);
        currentLookupType = type;
    }
    // 条件成立:当前nacos节点是单机模式
    else {
        LOOK_UP = new StandaloneMemberLookup();
    }
    LOOK_UP.injectMemberManager(memberManager);
    Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
    return LOOK_UP;
}
private static LookupType chooseLookup(String lookupType) {
    if (StringUtils.isNotBlank(lookupType)) {
        LookupType type = LookupType.sourceOf(lookupType);
        if (Objects.nonNull(type)) {
            return type;
        }
    }

    // 代码来到这里说明没有配置lookupType,此时会默认去寻找user.home/nacos/conf/cluster.conf文件
    File file = new File(EnvUtil.getClusterConfFilePath());
    // 条件成立:集群配置文件存在,或者环境变量配置了集群节点地址
    if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {
        // 返回文件寻址模式
        return LookupType.FILE_CONFIG;
    }

    // 返回服务器寻址模式
    return LookupType.ADDRESS_SERVER;
}
private static MemberLookup find(LookupType type) {
    // 条件成立:集群配置方式是文件配置的方式
    if (LookupType.FILE_CONFIG.equals(type)) {
        // 创建一个FileConfigMemberLookup对象并返回
        LOOK_UP = new FileConfigMemberLookup();
        return LOOK_UP;
    }

    // 条件成立:集群配置方式是通过服务器获取节点地址的方式
    if (LookupType.ADDRESS_SERVER.equals(type)) {
        LOOK_UP = new AddressServerMemberLookup();
        return LOOK_UP;
    }
    // unpossible to run here
    throw new IllegalArgumentException();
}

如果当前节点是集群模式,那么会去判断${user.home}/nacos/conf/cluster.conf这个文件是否存在或者环境变量中是否配置了集群节点地址,如果两者有一个成立就是文件寻址模式,反之是服务器寻址模式,而文件寻址模式对应的寻址器是FileConfigMemberLookup,服务器须知模式对应的寻址器是AddressServerMemberLookup,我们以文件寻址模式为例去看下是如何初始化集群的

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup 

public class FileConfigMemberLookup extends AbstractMemberLookup {

    
    private FileWatcher watcher = new FileWatcher() {

        
        @Override
        public void onChange(FileChangeEvent event) {
            readClusterConfFromDisk();
        }
        
        @Override
        public boolean interest(String context) {
            return StringUtils.contains(context, "cluster.conf");
        }
    };
    
    @Override
    public void start() throws NacosException {
        if (start.compareAndSet(false, true)) {
            // 从文件中读取集群节点地址
            readClusterConfFromDisk();

            try {
                // 使用notify机制监控文件的变化,并自动触发读取cluster.conf
                WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
            } catch (Throwable e) {
                Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
            }
        }
    }
    
    @Override
    public void destroy() throws NacosException {
        WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
    }

    
    private void readClusterConfFromDisk() {
        Collection tmpMembers = new ArrayList<>();
        try {
            // 获取到cluster.conf文件中配置的节点地址列表
            List tmp = EnvUtil.readClusterConf();
            // 把这些节点地址分别转换成对应的集群节点对象
            tmpMembers = MemberUtil.readServerConf(tmp);
        } catch (Throwable e) {
            Loggers.CLUSTER
                    .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
        }
        
        afterLookup(tmpMembers);
    }
}

 在start方法中会先调用readClusterConfFromDisk方法,这个方法会读取${user.home}/nacos/conf/cluster.conf这个文件中配置的节点地址,读取到之后把这些节点地址转化为对应的Member对象,一个Member对象就代表一个节点,接着会调用父类AbstractMemberLookup的afterLookup方法

public void afterLookup(Collection members) {
    this.memberManager.memberChange(members);
}

 调用的是集群节点管理器的menberChange方法,同时把上面从cluster.conf文件中读取到的Member节点集合作为方法参数,我们仔细看下menberChange这个方法

com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange 

synchronized boolean memberChange(Collection members) {
    
    if (members == null || members.isEmpty()) {
        return false;
    }

    // 配置的集群节点地址是否包含当前nacos节点
    boolean isContainSelfIp = members.stream()
            .anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
    
    if (isContainSelfIp) {
        isInIpList = true;
    } else {

        isInIpList = false;
        members.add(this.self);
        Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);
    }
    
    // If the number of old and new clusters is different, the cluster information
    // must have changed; if the number of clusters is the same, then compare whether
    // there is a difference; if there is a difference, then the cluster node changes
    // are involved and all recipients need to be notified of the node change event
    
    boolean hasChange = members.size() != serverList.size();
    ConcurrentSkipListMap tmpMap = new ConcurrentSkipListMap<>();
    Set tmpAddressInfo = new ConcurrentHashSet<>();
    for (Member member : members) {
        final String address = member.getAddress();
        
        if (!serverList.containsKey(address)) {
            hasChange = true;
            // 如果cluster.conf或address-server中的cluster信息被更改,而对应的nacos-server还没有启动,则成员的状态应该设置为DOWN,
            // 如果相应的nacos-server已经启动,则在几秒钟后检测到该成员的状态将被设置为UP
            member.setState(NodeState.DOWN);
        } else {
            //fix issue # 4925
            member.setState(serverList.get(address).getState());
        }
        
        // Ensure that the node is created only once
        tmpMap.put(address, member);
        if (NodeState.UP.equals(member.getState())) {
            tmpAddressInfo.add(address);
        }
    }

    // 更新serverList为最新的集群节点集合
    serverList = tmpMap;
    // 更新memberAddressInfos为最新的集群节点地址
    memberAddressInfos = tmpAddressInfo;

    // 获取更新之后集群所有的节点对象
    Collection finalMembers = allMembers();
    
    Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);
    
    // Persist the current cluster node information to cluster.conf
    //  need to put the event publication into a synchronized block to ensure
    // that the event publication is sequential
    // 条件成立:1.集群中有节点增加了
    //          2.集群中有节点下线了
    //          3.手动增加或者删除了节点配置地址信息
    if (hasChange) {
        // 把最新的节点写入到配置中(cluster.conf或者address-server)
        MemberUtil.syncToFile(finalMembers);
        // 发布一个MembersChangeEvent事件
        Event event = MembersChangeEvent.builder().members(finalMembers).build();
        NotifyCenter.publishEvent(event);
    }
    
    return hasChange;
}

这个方法中主要就是遍历配置文件中的Member集合,遍历的时候对每一个Member对象都会判断是否存在于serverList这个集合中,这里不存在的话是什么情况呢?我们可以想一下当前这个节点启动了,但是它是并不知道其他配置的节点有没有启动的,所以这里会把不存在于serverList的Member对象状态设置为NodeState.DOWN,如果其他节点还没启动,在这些节点启动之后就会互相进行发送心跳,当前节点检测到其他节点的心跳包之后就会把状态更新为NodeState.UP,节点之间发送心跳这一块后面的章节再详细说。而什么时候遍历的Member对象会存在于serverList集合中呢?下面我们会讲到在配置更改的时候也会触发这个方法,此时有可能serverList中就会存在遍历的Member对象了,那么此时只需要把serverList中这个Member对象的状态更新为遍历到的这个Member对象的状态就可以了。接着如果是集群节点的数量发生改变的话,就会发布一个MembersChangeEvent事件,而这个事件对应的订阅者是ServerListManager这个类,在这个类中也保存了整个nacos集群所有的节点集合,在回调它的订阅方法时很简单就是把这个集合属性重新赋值,代码如下:

@Override
public void onEvent(MembersChangeEvent event) {
    // 把最新的集群节点集合重新赋值到servers
    this.servers = new ArrayList<>(event.getMembers());
}

 回到刚开始的start方法,读取配置文件的过程已经分析完了,但是下面还有句代码:

// 使用notify机制监控文件的变化,并自动触发读取cluster.conf
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {
    checkState();
    if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) {
        return false;
    }
    WatchDirJob job = MANAGER.get(paths);
    if (job == null) {
        job = new WatchDirJob(paths);
        job.start();
        MANAGER.put(paths, job);
        NOW_WATCH_JOB_CNT++;
    }
    job.addSubscribe(watcher);
    return true;
}
private FileWatcher watcher = new FileWatcher() {

    
    @Override
    public void onChange(FileChangeEvent event) {
        readClusterConfFromDisk();
    }
    
    @Override
    public boolean interest(String context) {
        return StringUtils.contains(context, "cluster.conf");
    }
};

 从上面的代码可以知道这是对cluster.conf文件注册一个监听器,当这个文件发生变更的时候就会触发FileWatcher中的onChange回调方法,在onChange回调方法中会再次调用上面说的readClusterConfFromDisk方法,readClusterConfFromDisk方法就会重新读取一遍cluster.conf文件中的节点地址。自此Nacos节点启动初始化集群的流程就结束了。

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709871.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存