2021SC@SDUSC HBase项目分析:Master启动(三)

2021SC@SDUSC HBase项目分析:Master启动(三),第1张

2021SC@SDUSC HBase项目分析:Master启动(三)

2021SC@SDUSC

目录

Master启动过程源码分析


2021SC@SDUSC

接上一篇博客继续分析,这篇博客主要分析finishInitialization()方法

Master启动过程源码分析

finishInitialization()方法负责初始化master主功能,该方法很长,在上一篇博客中已贴出其全部代码,这篇博客不再全部贴出,会在接下来的分析过程中贴出部分相关的代码

1. 设置本Master的isActiveMaster标志为true

    isActiveMaster = true;

 2. 设置masterActiveTime为当前系统时间

    this.masterActiveTime = System.currentTimeMillis();

3. 初始化 MasterFileSystem,该对象封装了master常用的文件系统 *** 作,包括、删除region目录、删除table目录、删除cf目录、检查文件系统状态等,然后创建FSTableDescriptors对象

    this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery);

    this.tableDescriptors =
      new FSTableDescriptors(this.conf, this.fileSystemManager.getFileSystem(),
      this.fileSystemManager.getRootDir());

4. 将cluster id写入zookeeper的“hbaseid” ZNode,接着初始化ExecutorService和ServerManager。ExecutorService会维护一个ExecutorMap,一种Event对应一个Executor,可以通过提交EventHandler来执行异步事件;ServerManager负责管理regionserver信息,维护着online regionserver 和dead regionserver列表,处理regionserver的startups、shutdowns、 deaths等

    status.setStatus("Publishing Cluster ID in ZooKeeper");
    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());

    if (!masterRecovery) {
      this.executorService = new ExecutorService(getServerName().toShortString());
      this.serverManager = createServerManager(this, this);
    }

5. 调用initializeZKbasedSystemTrackers()来初始化各种基于zookeeper的组件

    initializeZKbasedSystemTrackers();

查看initializeZKbasedSystemTrackers()方法:a) 创建CatalogTracker, CatalogTracker包含RootRegionTracker和metaNodeTracker,分别对应”/hbase/root-region-server”和/”hbase/unassigned/1028785192”这两个结点;b) 创建LoadBalancer,LoadBalancer负责region在regionserver之间的移动;c) 创建AssignmentManager,AssignmentManager负责管理和分配region,同时它也会接受zk上关于region的event,根据event来完成region的上下线、关闭、打开等工作;d) 创建 RegionServerTracker,RegionServerTracker负责监控”/hbase/rs”结点,通过ZK的Event来跟踪online regionservers,如果有rs下线,则删除ServerManager中对应的online regions;e) 创建 DrainingServerTracker,DrainingServerTracker负责监控”/hbase/draining”结点;f) 创建 ClusterStatusTracker,ClusterStatusTracker负责监控”/hbase/shutdown”结点维护集群状态;g) 创建SnapshotManager,SnapshotManager负责管理Hbase快照功能,包括创建快照,恢复快照

 void initializeZKbasedSystemTrackers() throws IOException,
      InterruptedException, KeeperException {
    this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
    this.catalogTracker.start();

    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
    this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
    this.loadBalancerTracker.start();
    this.assignmentManager = new AssignmentManager(this, serverManager,
      this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
      this.tableLockManager);
    zooKeeper.registerListenerFirst(assignmentManager);

    this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
        this.serverManager);
    this.regionServerTracker.start();

    this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
      this.serverManager);
    this.drainingServerTracker.start();

    boolean wasUp = this.clusterStatusTracker.isClusterUp();
    if (!wasUp) this.clusterStatusTracker.setClusterUp();

    LOG.info("Server active/primary master=" + this.serverName +
        ", sessionid=0x" +
        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
        ", setting cluster-up flag (Was=" + wasUp + ")");

    this.snapshotManager = new SnapshotManager();
    this.mpmHost = new MasterProcedureManagerHost();
    this.mpmHost.register(this.snapshotManager);
    this.mpmHost.loadProcedures(conf);
    this.mpmHost.initialize(this, this.metricsMaster);
  }

6. 初始化MasterCoprocessorHost,并调用startServiceThreads()方法来启动各种服务,MasterCoprocessorHost是负责管理所有master相关 *** 作的协处理器

    if (!masterRecovery) {
      status.setStatus("Initializing master coprocessors");
      this.cpHost = new MasterCoprocessorHost(this, this.conf);

      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());

      status.setStatus("Initializing master service threads");
      startServiceThreads();
    }

查看startServiceThreads()方法:首先启动MASTER_OPEN_REGION、MASTER_CLOSE_REGION、MASTER_SERVER_OPERATIONS、MASTER_meta_SERVER_OPERATIONS、MASTER_TABLE_OPERATIONS这五个事件的处理线程池,然后创建logCleaner并启动、创建hfileCleaner并启动、启动healthCheckChore,最后调用RpcServer的openServer()方法,允许响应请求:

  void startServiceThreads() throws IOException{
   this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
      conf.getInt("hbase.master.executor.openregion.threads", 5));
   this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
      conf.getInt("hbase.master.executor.closeregion.threads", 5));
   this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
      conf.getInt("hbase.master.executor.serverops.threads", 5));
   this.executorService.startExecutorService(ExecutorType.MASTER_meta_SERVER_OPERATIONS,
      conf.getInt("hbase.master.executor.serverops.threads", 5));
   this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
      conf.getInt("hbase.master.executor.logreplayops.threads", 10));

   this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);

   String n = Thread.currentThread().getName();
   int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
   this.logCleaner =
      new LogCleaner(cleanerInterval,
         this, conf, getMasterFileSystem().getFileSystem(),
         getMasterFileSystem().getOldLogDir());
         Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");

    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
    this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
        .getFileSystem(), archiveDir);
    Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");

    if (this.healthCheckChore != null) {
      Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
    }

    this.rpcServer.openServer();
    this.rpcServerOpen = true;
    if (LOG.isTraceEnabled()) {
      LOG.trace("Started service threads");
    }
  }

7. 调用ServerManager.waitForRegionServers()来等待所有region server报告,当满足以下条件时返回:

  • 至少等待4.5s(hbase.master.wait.on.regionservers.timeout)
  • 成功启动的regionserver节点数>=1(hbase.master.wait.on.regionservers.mintostart)
  • 1.5s(hbase.master.wait.on.regionservers.interval)内没有regionsever挂掉或重新启动
    this.serverManager.waitForRegionServers(status);

8. 检查zk中已启动但未注册的regionserver,并将这些regionserver注册

    for (ServerName sn: this.regionServerTracker.getonlineServers()) {
      if (!this.serverManager.isServeronline(sn)
          && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
        LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
      }
    }

9. 启动AssignmentManager的超时监控,即TimeoutMonitor

    if (!masterRecovery) {
      this.assignmentManager.startTimeOutMonitor();
    }

10. 调用getFailedServersFromLogFolders()方法获取已挂掉,却在WALs目录下仍然遗留log的server

    Set previouslyFailedServers = this.fileSystemManager
        .getFailedServersFromLogFolders();

11. 调用removeStaleRecoveringRegionsFromZK()方法从ZooKeerper中删除上一步找到的server,此处并不是删除log,而是删除ZooKeeper中挂掉的server信息,后面会对每个region重新分配region server

    this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);

12.  如果meta表被分配给挂掉的server,则先分离meta log

    ServerName oldmetaServerLocation = this.catalogTracker.getmetaLocation();
    if (oldmetaServerLocation != null && previouslyFailedServers.contains(oldmetaServerLocation)) {
      splitmetaLogBeforeAssignment(oldmetaServerLocation);
    }

13. 从ZooKeeper获取之前挂掉的meta server,合并挂掉的meta server和region server列表

  Set previouslyFailedmetaRSs = getPreviouselyFailedmetaServersFromZK();
  previouslyFailedmetaRSs.addAll(previouslyFailedServers);

14. 初始化load balancer

    this.balancer.setClusterStatus(getClusterStatus());
    this.balancer.setMasterServices(this);
    this.balancer.initialize();

15. 分配meta region到region server

    status.setStatus("Assigning meta Region");
    assignmeta(status, previouslyFailedmetaRSs);

16. 处理挂掉的server

    for (ServerName tmpServer : previouslyFailedServers) {
      this.serverManager.processDeadServer(tmpServer, true);
    }

17. 创建并启动clusterStatusChore、balancerChore、catalogJanitorChore

      this.clusterStatusChore = getAndStartClusterStatusChore(this);
      this.balancerChore = getAndStartBalancerChore(this);
      this.catalogJanitorChore = new CatalogJanitor(this, this);
      startCatalogJanitorChore();

18. 清理挂掉的server

    this.serverManager.clearDeadServersWithSameHostNameAndPortOfonlineServer();

 

 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5479704.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存