Nacos集群节点管理器ServerMemberManagerNacos可以选择单机模式运行和集群模式运行,在生产中基本都要使用集群模式去运行的,而对于集群模式来说,每一个Nacos节点都需要通过配置知道其余节点的地址,比如说三个节点的集群,每个节点都需要知道另外两个节点的地址,才能够形成一个集群,下面我们就来看下Nacos节点在启动时是怎么获取其余节点地址进行集群初始化的
com.alibaba.nacos.core.cluster.ServerMemberManager
在Nacos服务启动的时候ServerMemberManager这个类专门对集群节点进行管理的,这个类在init方法中就会对集群进行初始化
protected void init() throws NacosException { Loggers.CORE.info("Nacos-related cluster resource initialization"); // 得到当前nacos服务的端口号 this.port = EnvUtil.getProperty("server.port", Integer.class, 8848); // 得到当前nacos服务的地址 this.localAddress = InetUtils.getSelfIP() + ":" + port; // 解析地址得到当前nacos服务所对应的集群节点对象 this.self = MemberUtil.singleParse(this.localAddress); // 给当前nacos服务设置一个版本号 this.self.setExtendVal(MembermetaDataConstants.VERSION, VersionUtils.version); // 把自己放到serverList中 serverList.put(self.getAddress(), self); // 该方法做了两件事 // 1.注册了一个集群节点信息变更事件 // 2.注册了订阅IPChangeEvent事件的事件订阅者 registerClusterEvent(); // 初始化节点地址寻址模式 // 在这里就可以通过配置的节点地址去初始化整个nacos集群节点集合了 initAndStartLookup(); if (serverList.isEmpty()) { throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit."); } Loggers.CORE.info("The cluster resource is initialized"); }
com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent
private void registerClusterEvent() { // 注册一个集群节点信息变更事件 NotifyCenter.registerToPublisher(MembersChangeEvent.class, EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128)); // 注册一个事件订阅者,订阅的事件类型是IPChangeEvent NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(InetUtils.IPChangeEvent event) { String newAddress = event.getNewIP() + ":" + port; ServerMemberManager.this.localAddress = newAddress; EnvUtil.setLocalAddress(localAddress); Member self = ServerMemberManager.this.self; self.setIp(event.getNewIP()); String oldAddress = event.getOldIP() + ":" + port; ServerMemberManager.this.serverList.remove(oldAddress); ServerMemberManager.this.serverList.put(newAddress, self); ServerMemberManager.this.memberAddressInfos.remove(oldAddress); ServerMemberManager.this.memberAddressInfos.add(newAddress); } @Override public Class extends Event> subscribeType() { return InetUtils.IPChangeEvent.class; } }); }
这个方法会注册一个MembersChangeEvent事件,该事件是在集群节点发生变更的时候发布的,而对应的事件订阅者是ServerListManager,集群节点发生变更这一块后面的章节会详细讲,我们只需要知道在初始化阶段会注册这么一个事件就行了。同时该方法还会注册一个IPChangeEvent事件的事件订阅者,IPChangeEvent这个事件顾名思义就是当前节点IP发生变更之后发布的,该事件发布之后会被这个注册的订阅者所捕获,该订阅者做的事情也很简单,就是对集群节点集合中对应当前节点的ip进行更新就行了
com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup
这一行代码就是初始化集群的关键
private void initAndStartLookup() throws NacosException { // 创建nacos集群节点寻址器 this.lookup = LookupFactory.createLookUp(this); this.lookup.start(); }
先是通过LookupFactory创建一个节点寻址器,然后调用start方法启动这个节点寻址器,那么什么是节点寻址器?我们知道Nacos配置集群节点地址的时候有两种方式,一种是直接读取本地配置文件配置好的节点地址,另一种是通过配置服务器获取配置的节点地址,两种方式对应两中不同方式的寻址器,所以这里就需要找到具体的寻址器
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException { // 条件成立:当前nacos节点是集群模式 if (!EnvUtil.getStandaloneMode()) { // 从配置环境中获取nacos集群节点的寻址方式 String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE); LookupType type = chooseLookup(lookupType); // 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookup LOOK_UP = find(type); currentLookupType = type; } // 条件成立:当前nacos节点是单机模式 else { LOOK_UP = new StandaloneMemberLookup(); } LOOK_UP.injectMemberManager(memberManager); Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName()); return LOOK_UP; }
private static LookupType chooseLookup(String lookupType) { if (StringUtils.isNotBlank(lookupType)) { LookupType type = LookupType.sourceOf(lookupType); if (Objects.nonNull(type)) { return type; } } // 代码来到这里说明没有配置lookupType,此时会默认去寻找user.home/nacos/conf/cluster.conf文件 File file = new File(EnvUtil.getClusterConfFilePath()); // 条件成立:集群配置文件存在,或者环境变量配置了集群节点地址 if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) { // 返回文件寻址模式 return LookupType.FILE_CONFIG; } // 返回服务器寻址模式 return LookupType.ADDRESS_SERVER; }
private static MemberLookup find(LookupType type) { // 条件成立:集群配置方式是文件配置的方式 if (LookupType.FILE_CONFIG.equals(type)) { // 创建一个FileConfigMemberLookup对象并返回 LOOK_UP = new FileConfigMemberLookup(); return LOOK_UP; } // 条件成立:集群配置方式是通过服务器获取节点地址的方式 if (LookupType.ADDRESS_SERVER.equals(type)) { LOOK_UP = new AddressServerMemberLookup(); return LOOK_UP; } // unpossible to run here throw new IllegalArgumentException(); }
如果当前节点是集群模式,那么会去判断${user.home}/nacos/conf/cluster.conf这个文件是否存在或者环境变量中是否配置了集群节点地址,如果两者有一个成立就是文件寻址模式,反之是服务器寻址模式,而文件寻址模式对应的寻址器是FileConfigMemberLookup,服务器须知模式对应的寻址器是AddressServerMemberLookup,我们以文件寻址模式为例去看下是如何初始化集群的
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup
public class FileConfigMemberLookup extends AbstractMemberLookup { private FileWatcher watcher = new FileWatcher() { @Override public void onChange(FileChangeEvent event) { readClusterConfFromDisk(); } @Override public boolean interest(String context) { return StringUtils.contains(context, "cluster.conf"); } }; @Override public void start() throws NacosException { if (start.compareAndSet(false, true)) { // 从文件中读取集群节点地址 readClusterConfFromDisk(); try { // 使用notify机制监控文件的变化,并自动触发读取cluster.conf WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher); } catch (Throwable e) { Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage()); } } } @Override public void destroy() throws NacosException { WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher); } private void readClusterConfFromDisk() { CollectiontmpMembers = new ArrayList<>(); try { // 获取到cluster.conf文件中配置的节点地址列表 List tmp = EnvUtil.readClusterConf(); // 把这些节点地址分别转换成对应的集群节点对象 tmpMembers = MemberUtil.readServerConf(tmp); } catch (Throwable e) { Loggers.CLUSTER .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage()); } afterLookup(tmpMembers); } }
在start方法中会先调用readClusterConfFromDisk方法,这个方法会读取${user.home}/nacos/conf/cluster.conf这个文件中配置的节点地址,读取到之后把这些节点地址转化为对应的Member对象,一个Member对象就代表一个节点,接着会调用父类AbstractMemberLookup的afterLookup方法
public void afterLookup(Collectionmembers) { this.memberManager.memberChange(members); }
调用的是集群节点管理器的menberChange方法,同时把上面从cluster.conf文件中读取到的Member节点集合作为方法参数,我们仔细看下menberChange这个方法
com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange
synchronized boolean memberChange(Collectionmembers) { if (members == null || members.isEmpty()) { return false; } // 配置的集群节点地址是否包含当前nacos节点 boolean isContainSelfIp = members.stream() .anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress())); if (isContainSelfIp) { isInIpList = true; } else { isInIpList = false; members.add(this.self); Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members); } // If the number of old and new clusters is different, the cluster information // must have changed; if the number of clusters is the same, then compare whether // there is a difference; if there is a difference, then the cluster node changes // are involved and all recipients need to be notified of the node change event boolean hasChange = members.size() != serverList.size(); ConcurrentSkipListMap tmpMap = new ConcurrentSkipListMap<>(); Set tmpAddressInfo = new ConcurrentHashSet<>(); for (Member member : members) { final String address = member.getAddress(); if (!serverList.containsKey(address)) { hasChange = true; // 如果cluster.conf或address-server中的cluster信息被更改,而对应的nacos-server还没有启动,则成员的状态应该设置为DOWN, // 如果相应的nacos-server已经启动,则在几秒钟后检测到该成员的状态将被设置为UP member.setState(NodeState.DOWN); } else { //fix issue # 4925 member.setState(serverList.get(address).getState()); } // Ensure that the node is created only once tmpMap.put(address, member); if (NodeState.UP.equals(member.getState())) { tmpAddressInfo.add(address); } } // 更新serverList为最新的集群节点集合 serverList = tmpMap; // 更新memberAddressInfos为最新的集群节点地址 memberAddressInfos = tmpAddressInfo; // 获取更新之后集群所有的节点对象 Collection finalMembers = allMembers(); Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers); // Persist the current cluster node information to cluster.conf // need to put the event publication into a synchronized block to ensure // that the event publication is sequential // 条件成立:1.集群中有节点增加了 // 2.集群中有节点下线了 // 3.手动增加或者删除了节点配置地址信息 if (hasChange) { // 把最新的节点写入到配置中(cluster.conf或者address-server) MemberUtil.syncToFile(finalMembers); // 发布一个MembersChangeEvent事件 Event event = MembersChangeEvent.builder().members(finalMembers).build(); NotifyCenter.publishEvent(event); } return hasChange; }
这个方法中主要就是遍历配置文件中的Member集合,遍历的时候对每一个Member对象都会判断是否存在于serverList这个集合中,这里不存在的话是什么情况呢?我们可以想一下当前这个节点启动了,但是它是并不知道其他配置的节点有没有启动的,所以这里会把不存在于serverList的Member对象状态设置为NodeState.DOWN,如果其他节点还没启动,在这些节点启动之后就会互相进行发送心跳,当前节点检测到其他节点的心跳包之后就会把状态更新为NodeState.UP,节点之间发送心跳这一块后面的章节再详细说。而什么时候遍历的Member对象会存在于serverList集合中呢?下面我们会讲到在配置更改的时候也会触发这个方法,此时有可能serverList中就会存在遍历的Member对象了,那么此时只需要把serverList中这个Member对象的状态更新为遍历到的这个Member对象的状态就可以了。接着如果是集群节点的数量发生改变的话,就会发布一个MembersChangeEvent事件,而这个事件对应的订阅者是ServerListManager这个类,在这个类中也保存了整个nacos集群所有的节点集合,在回调它的订阅方法时很简单就是把这个集合属性重新赋值,代码如下:
@Override public void onEvent(MembersChangeEvent event) { // 把最新的集群节点集合重新赋值到servers this.servers = new ArrayList<>(event.getMembers()); }
回到刚开始的start方法,读取配置文件的过程已经分析完了,但是下面还有句代码:
// 使用notify机制监控文件的变化,并自动触发读取cluster.conf WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException { checkState(); if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) { return false; } WatchDirJob job = MANAGER.get(paths); if (job == null) { job = new WatchDirJob(paths); job.start(); MANAGER.put(paths, job); NOW_WATCH_JOB_CNT++; } job.addSubscribe(watcher); return true; }
private FileWatcher watcher = new FileWatcher() { @Override public void onChange(FileChangeEvent event) { readClusterConfFromDisk(); } @Override public boolean interest(String context) { return StringUtils.contains(context, "cluster.conf"); } };
从上面的代码可以知道这是对cluster.conf文件注册一个监听器,当这个文件发生变更的时候就会触发FileWatcher中的onChange回调方法,在onChange回调方法中会再次调用上面说的readClusterConfFromDisk方法,readClusterConfFromDisk方法就会重新读取一遍cluster.conf文件中的节点地址。自此Nacos节点启动初始化集群的流程就结束了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)