2021SC@SDUSC
目录
Master启动过程源码分析
2021SC@SDUSC
接上一篇博客继续分析,这篇博客主要分析finishInitialization()方法
Master启动过程源码分析finishInitialization()方法负责初始化master主功能,该方法很长,在上一篇博客中已贴出其全部代码,这篇博客不再全部贴出,会在接下来的分析过程中贴出部分相关的代码
1. 设置本Master的isActiveMaster标志为true
isActiveMaster = true;
2. 设置masterActiveTime为当前系统时间
this.masterActiveTime = System.currentTimeMillis();
3. 初始化 MasterFileSystem,该对象封装了master常用的文件系统 *** 作,包括、删除region目录、删除table目录、删除cf目录、检查文件系统状态等,然后创建FSTableDescriptors对象
this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery); this.tableDescriptors = new FSTableDescriptors(this.conf, this.fileSystemManager.getFileSystem(), this.fileSystemManager.getRootDir());
4. 将cluster id写入zookeeper的“hbaseid” ZNode,接着初始化ExecutorService和ServerManager。ExecutorService会维护一个ExecutorMap,一种Event对应一个Executor,可以通过提交EventHandler来执行异步事件;ServerManager负责管理regionserver信息,维护着online regionserver 和dead regionserver列表,处理regionserver的startups、shutdowns、 deaths等
status.setStatus("Publishing Cluster ID in ZooKeeper"); ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); if (!masterRecovery) { this.executorService = new ExecutorService(getServerName().toShortString()); this.serverManager = createServerManager(this, this); }
5. 调用initializeZKbasedSystemTrackers()来初始化各种基于zookeeper的组件
initializeZKbasedSystemTrackers();
查看initializeZKbasedSystemTrackers()方法:a) 创建CatalogTracker, CatalogTracker包含RootRegionTracker和metaNodeTracker,分别对应”/hbase/root-region-server”和/”hbase/unassigned/1028785192”这两个结点;b) 创建LoadBalancer,LoadBalancer负责region在regionserver之间的移动;c) 创建AssignmentManager,AssignmentManager负责管理和分配region,同时它也会接受zk上关于region的event,根据event来完成region的上下线、关闭、打开等工作;d) 创建 RegionServerTracker,RegionServerTracker负责监控”/hbase/rs”结点,通过ZK的Event来跟踪online regionservers,如果有rs下线,则删除ServerManager中对应的online regions;e) 创建 DrainingServerTracker,DrainingServerTracker负责监控”/hbase/draining”结点;f) 创建 ClusterStatusTracker,ClusterStatusTracker负责监控”/hbase/shutdown”结点维护集群状态;g) 创建SnapshotManager,SnapshotManager负责管理Hbase快照功能,包括创建快照,恢复快照
void initializeZKbasedSystemTrackers() throws IOException, InterruptedException, KeeperException { this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this); this.catalogTracker.start(); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, this.catalogTracker, this.balancer, this.executorService, this.metricsMaster, this.tableLockManager); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); this.regionServerTracker.start(); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); LOG.info("Server active/primary master=" + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + ", setting cluster-up flag (Was=" + wasUp + ")"); this.snapshotManager = new SnapshotManager(); this.mpmHost = new MasterProcedureManagerHost(); this.mpmHost.register(this.snapshotManager); this.mpmHost.loadProcedures(conf); this.mpmHost.initialize(this, this.metricsMaster); }
6. 初始化MasterCoprocessorHost,并调用startServiceThreads()方法来启动各种服务,MasterCoprocessorHost是负责管理所有master相关 *** 作的协处理器
if (!masterRecovery) { status.setStatus("Initializing master coprocessors"); this.cpHost = new MasterCoprocessorHost(this, this.conf); spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); status.setStatus("Initializing master service threads"); startServiceThreads(); }
查看startServiceThreads()方法:首先启动MASTER_OPEN_REGION、MASTER_CLOSE_REGION、MASTER_SERVER_OPERATIONS、MASTER_meta_SERVER_OPERATIONS、MASTER_TABLE_OPERATIONS这五个事件的处理线程池,然后创建logCleaner并启动、创建hfileCleaner并启动、启动healthCheckChore,最后调用RpcServer的openServer()方法,允许响应请求:
void startServiceThreads() throws IOException{ this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_meta_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); String n = Thread.currentThread().getName(); int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner"); Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner"); if (this.healthCheckChore != null) { Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker"); } this.rpcServer.openServer(); this.rpcServerOpen = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } }
7. 调用ServerManager.waitForRegionServers()来等待所有region server报告,当满足以下条件时返回:
- 至少等待4.5s(hbase.master.wait.on.regionservers.timeout)
- 成功启动的regionserver节点数>=1(hbase.master.wait.on.regionservers.mintostart)
- 1.5s(hbase.master.wait.on.regionservers.interval)内没有regionsever挂掉或重新启动
this.serverManager.waitForRegionServers(status);
8. 检查zk中已启动但未注册的regionserver,并将这些regionserver注册
for (ServerName sn: this.regionServerTracker.getonlineServers()) { if (!this.serverManager.isServeronline(sn) && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { LOG.info("Registered server found up in zk but who has not yet reported in: " + sn); } }
9. 启动AssignmentManager的超时监控,即TimeoutMonitor
if (!masterRecovery) { this.assignmentManager.startTimeOutMonitor(); }
10. 调用getFailedServersFromLogFolders()方法获取已挂掉,却在WALs目录下仍然遗留log的server
SetpreviouslyFailedServers = this.fileSystemManager .getFailedServersFromLogFolders();
11. 调用removeStaleRecoveringRegionsFromZK()方法从ZooKeerper中删除上一步找到的server,此处并不是删除log,而是删除ZooKeeper中挂掉的server信息,后面会对每个region重新分配region server
this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
12. 如果meta表被分配给挂掉的server,则先分离meta log
ServerName oldmetaServerLocation = this.catalogTracker.getmetaLocation(); if (oldmetaServerLocation != null && previouslyFailedServers.contains(oldmetaServerLocation)) { splitmetaLogBeforeAssignment(oldmetaServerLocation); }
13. 从ZooKeeper获取之前挂掉的meta server,合并挂掉的meta server和region server列表
SetpreviouslyFailedmetaRSs = getPreviouselyFailedmetaServersFromZK(); previouslyFailedmetaRSs.addAll(previouslyFailedServers);
14. 初始化load balancer
this.balancer.setClusterStatus(getClusterStatus()); this.balancer.setMasterServices(this); this.balancer.initialize();
15. 分配meta region到region server
status.setStatus("Assigning meta Region"); assignmeta(status, previouslyFailedmetaRSs);
16. 处理挂掉的server
for (ServerName tmpServer : previouslyFailedServers) { this.serverManager.processDeadServer(tmpServer, true); }
17. 创建并启动clusterStatusChore、balancerChore、catalogJanitorChore
this.clusterStatusChore = getAndStartClusterStatusChore(this); this.balancerChore = getAndStartBalancerChore(this); this.catalogJanitorChore = new CatalogJanitor(this, this); startCatalogJanitorChore();
18. 清理挂掉的server
this.serverManager.clearDeadServersWithSameHostNameAndPortOfonlineServer();
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)