Hadoop源码分析(25)

Hadoop源码分析(25),第1张

Hadoop源码分析(25) ZKFC源码分析

  从文档(4)到文档(24),详细分析了namenode的启动流程。最后namenode会以standby模式启动。但在standby模式下的namenode是不会对外提供服务的,还必须要选择一台namenode将其转换为active模式,然后active的namenode对外提供服务。

 这里将standby模式下的namenode转换为active模式,是由ZKFC来执行的。这里先从zkfc的启动开始分析。

  与namenode相同,在bin目录下的hdfs文件中也记录了zkfc对应的java类。其内容如下:

  这里实际调用的java类是DFSZKFailoverController类,这个类的main方法如下:

public static void main(String args[])
      throws Exception {
    if (DFSUtil.parseHelpArgument(args, 
        ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }

    GenericOptionsParser parser = new GenericOptionsParser(
        new HdfsConfiguration(), args);
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(
        parser.getConfiguration());
    int retCode = 0;
    try {
      retCode = zkfc.run(parser.getRemainingArgs());
    } catch (Throwable t) {
      LOG.fatal("Got a fatal error, exiting now", t);
    }
    System.exit(retCode);
  }

  这里和namenode的main方法相似,首先是解析传入的参数(第3行到第9行),然后创建一个zkfc对象(第10行),最后调用其run方法将服务启动(第14行)。

  这里创建zkfc对象的create方法内容如下:

public static DFSZKFailoverController create(Configuration conf) {
    Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
    String nsId = DFSUtil.getNamenodeNameServiceId(conf);

    if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
      throw new HadoopIllegalArgumentException(
          "HA is not enabled for this namenode.");
    }
    String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
    if (nnId == null) {
      String msg = "Could not get the namenode ID of this node. " +
          "You may run zkfc on the node other than namenode.";
      throw new HadoopIllegalArgumentException(msg);
    }
    NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
    DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);

    NNHAServiceTarget localTarget = new NNHAServiceTarget(
        localNNConf, nsId, nnId);
    return new DFSZKFailoverController(localNNConf, localTarget);
  }

  这里的重点在最后一行,直接创建了一个DFSZKFailoverController对象,其构造方法如下:

private DFSZKFailoverController(Configuration conf,
      NNHAServiceTarget localTarget) {
    super(conf, localTarget);
    this.localNNTarget = localTarget;
    // Setup ACLs
    adminAcl = new AccessControlList(
        conf.get(DFSConfigKeys.DFS_ADMIN, " "));
    LOG.info("Failover controller configured for NameNode " +
        localTarget);
}

  这个方法主要是一些赋值 *** 作。

 然后再分析这个类的run方法,其内容如下:

 public int run(final String[] args) throws Exception {
    if (!localTarget.isAutoFailoverEnabled()) {
      LOG.fatal("Automatic failover is not enabled for " + localTarget + "." +
          " Please ensure that automatic failover is enabled in the " +
          "configuration before running the ZK failover controller.");
      return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
    }
    loginAsFCUser();
    try {
      return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
        @Override
        public Integer run() {
          try {
            return doRun(args);
          } catch (Exception t) {
            throw new RuntimeException(t);
          } finally {
            if (elector != null) {
              elector.terminateConnection();
            }
          }
        }
      });
    } catch (RuntimeException rte) {
      throw (Exception)rte.getCause();
    }
  }

  这个方法的重点就一行,即第14行的doRun方法,这个方法的内容如下:

private int doRun(String[] args)
      throws HadoopIllegalArgumentException, IOException, InterruptedException {
    try {
      initZK();
    } catch (KeeperException ke) {
      LOG.fatal("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.");
      return ERR_CODE_NO_ZK;
    }
    if (args.length > 0) {
      if ("-formatZK".equals(args[0])) {
        boolean force = false;
        boolean interactive = true;
        for (int i = 1; i < args.length; i++) {
          if ("-force".equals(args[i])) {
            force = true;
          } else if ("-nonInteractive".equals(args[i])) {
            interactive = false;
          } else {
            badArg(args[i]);
          }
        }
        return formatZK(force, interactive);
      } else {
        badArg(args[0]);
      }
    }

    if (!elector.parentZNodeExists()) {
      LOG.fatal("Unable to start failover controller. "
          + "Parent znode does not exist.\n"
          + "Run with -formatZK flag to initialize ZooKeeper.");
      return ERR_CODE_NO_PARENT_ZNODE;
    }

    try {
      localTarget.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
          "You must configure a fencing method before using automatic " +
          "failover.", e);
      return ERR_CODE_NO_FENCER;
    }

    initRPC();
    initHM();
    startRPC();
    try {
      mainLoop();
    } finally {
      rpcServer.stopAndJoin();

      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }

  首先第一个重点在于第4行的initZK方法,这个方法会创建一个zookeeper的客户端连接到zookeeper。然后是第12行到第29行处理格式化zk的请求。然后是第31行到第45行,做一些参数检查的 *** 作。然后是第二个重点第47行到第49行,初始化RPC和HM,并启动RPC。RPC服务在之前的文档中分析过了,这里更重要的是HM,它是healthMonitor的缩写。最后是第51行的mainLoop方法阻塞等待退出。

  首先分析连接zookeeper的initZK方法,其内容如下:

private void initZK() throws HadoopIllegalArgumentException, IOException,
      KeeperException {
    zkQuorum = conf.get(ZK_QUORUM_KEY);
    int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
        ZK_SESSION_TIMEOUT_DEFAULT);
    // Parse ACLs from configuration.
    String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
    if (zkAcls.isEmpty()) {
      zkAcls = Ids.CREATOR_ALL_ACL;
    }

    // Parse authentication from configuration.
    String zkAuthConf = conf.get(ZK_AUTH_KEY);
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    List<ZKAuthInfo> zkAuths;
    if (zkAuthConf != null) {
      zkAuths = ZKUtil.parseAuth(zkAuthConf);
    } else {
      zkAuths = Collections.emptyList();
    }

    // Sanity check configuration.
    Preconditions.checkArgument(zkQuorum != null,
        "Missing required configuration '%s' for ZooKeeper quorum",
        ZK_QUORUM_KEY);
    Preconditions.checkArgument(zkTimeout > 0,
        "Invalid ZK session timeout %s", zkTimeout);

    int maxRetryNum = conf.getInt(
        CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
        CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
    elector = new ActiveStandbyElector(zkQuorum,
        zkTimeout, getParentZnode(), zkAcls, zkAuths,
        new ElectorCallbacks(), maxRetryNum);
  }

  这个方法法会获取一系列参数,然后根据这些参数来创建一个ActiveStandbyElector类的对象(第34行)。这个对象的构造方法如下:

public ActiveStandbyElector(String zookeeperHostPorts,
      int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
      List<ZKAuthInfo> authInfo,
      ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
      HadoopIllegalArgumentException, KeeperException {
    if (app == null || acl == null || parentZnodeName == null
        || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
      throw new HadoopIllegalArgumentException("Invalid argument");
    }
    zkHostPort = zookeeperHostPorts;
    zkSessionTimeout = zookeeperSessionTimeout;
    zkAcl = acl;
    zkAuthInfo = authInfo;
    appClient = app;
    znodeWorkingDir = parentZnodeName;
    zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
    zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
    this.maxRetryNum = maxRetryNum;

    // createConnection for future API calls
    createConnection();
  }

  这里主要是对传入的参数进行赋值,重点是第21行调用的createConnection方法会创建连接zookeeper的客户端。其内容如下:

private void createConnection() throws IOException, KeeperException {
    if (zkClient != null) {
      try {
        zkClient.close();
      } catch (InterruptedException e) {
        throw new IOException("Interrupted while closing ZK",
            e);
      }
      zkClient = null;
      watcher = null;
    }
    zkClient = connectToZooKeeper();
    LOG.debug("Created new connection for " + this);
  }

  这里首先会检查zkClient是否为空,若不为空则先关闭zkClient,然后再调用connectToZooKeeper方法重新创建。这个方法的内容如下:

protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
      KeeperException {

    // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
    // may trigger the Connected event immediately. So, if we register the
    // watcher after constructing ZooKeeper, we may miss that event. Instead,
    // we construct the watcher first, and have it block any events it receives
    // before we can set its ZooKeeper reference.
    watcher = new WatcherWithClientRef();
    ZooKeeper zk = createZooKeeper();
    watcher.setZooKeeperRef(zk);

    // Wait for the asynchronous success/failure. This may throw an exception
    // if we don't connect within the session timeout.
    watcher.waitForZKConnectionEvent(zkSessionTimeout);

    for (ZKAuthInfo auth : zkAuthInfo) {
      zk.addAuthInfo(auth.getScheme(), auth.getAuth());
    }
    return zk;
  }

 这里主要创建了两个对象:watcher和zk。zk是zookeeper的客户端,watcher用来监控和处理zookeeper的事件。

  在zookeeper的初始化完成后,zkfc的下一个重点是初始化HM,调用的initHM方法内容如下:

  private void initHM() {
    healthMonitor = new HealthMonitor(conf, localTarget);
    healthMonitor.addCallback(new HealthCallbacks());
    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
    healthMonitor.start();
  }

  首先是第2行创建了HealthMonitor对象,然后是第3行为HealthMonitor设置了一个HealthCallbacks对象。然后是第4行设置了一个ServiceStateCallBacks对象。最后是第5行启动HealthMonitor。

  HealthMonitor的构造方法如下:

HealthMonitor(Configuration conf, HAServiceTarget target) {
    this.targetToMonitor = target;
    this.conf = conf;

    this.sleepAfterDisconnectMillis = conf.getLong(
        HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
        HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT);
    this.checkIntervalMillis = conf.getLong(
        HA_HM_CHECK_INTERVAL_KEY,
        HA_HM_CHECK_INTERVAL_DEFAULT);
    this.connectRetryInterval = conf.getLong(
        HA_HM_CONNECT_RETRY_INTERVAL_KEY,
        HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
    this.rpcTimeout = conf.getInt(
        HA_HM_RPC_TIMEOUT_KEY,
        HA_HM_RPC_TIMEOUT_DEFAULT);

    this.daemon = new MonitorDaemon();
  }

  这里主要是一些赋值 *** 作,其中的重点在最后一行的daemon参数,这里创建的是一个MonitorDaemon对象,这个daemon类是继承了Thead类的,是一个线程类。

  然后是两个设置回调函数的方法,其内容如下:

 public void addCallback(Callback cb) {
    this.callbacks.add(cb);
  }

  public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
    this.serviceStateCallbacks.add(cb);
  }

  设置回调函数都是将传入的对象添加到对应的回调函数列表中。

  最后是其启动方法,内容如下:

  void start() {
    daemon.start();
  }

  上文提到了daemon是一个线程类,这里的start方法是线程的启动方法。其run方法内容如下:

   @Override
    public void run() {
      while (shouldRun) {
        try { 
          loopUntilConnected();
          doHealthChecks();
        } catch (InterruptedException ie) {
          Preconditions.checkState(!shouldRun,
              "Interrupted but still supposed to run");
        }
      }
    }
  }

  这里的重点是第5行的loopUntilConnected方法和第6行的doHealthChecks方法。这个线程是用来对namenode进行健康检查的线程,只有通过了健康检查的namenode才会加入选举。这里的loopUntilConnected方法是用来阻塞等待链接namenode的方法,doHealthChecks方法是用来执行健康检查的方法。

  其中loopUntilConnected方法内容如下:

 private void loopUntilConnected() throws InterruptedException {
    tryConnect();
    while (proxy == null) {
      Thread.sleep(connectRetryInterval);
      tryConnect();
    }
    assert proxy != null;
  }

  这里很简单,就是使用一个while循环一直执行tryConnect方法。这个方法的内容如下:

 private void tryConnect() {
    Preconditions.checkState(proxy == null);

    try {
      synchronized (this) {
        proxy = createProxy();
      }
    } catch (IOException e) {
      LOG.warn("Could not connect to local service at " + targetToMonitor +
          ": " + e.getMessage());
      proxy = null;
      enterState(State.SERVICE_NOT_RESPONDING);
    }
  }

 这个方法也很简单,主要是第6行的createProxy方法来创建代理对象。这个方法内容如下:

  protected HAServiceProtocol createProxy() throws IOException {
    return targetToMonitor.getProxy(conf, rpcTimeout);
  }

  上述的getProxy方法内容如下:

public HAServiceProtocol getProxy(Configuration conf, int timeoutMs)
      throws IOException {
    Configuration confCopy = new Configuration(conf);
    // Lower the timeout so we quickly fail to connect
    confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
    SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
    return new HAServiceProtocolClientSideTranslatorPB(
        getAddress(),
        confCopy, factory, timeoutMs);
  }

  这里最终创建的是一个HAServiceProtocolClientSideTranslatorPB类的对象,这个类的构造方法如下:

  public HAServiceProtocolClientSideTranslatorPB(
      InetSocketAddress addr, Configuration conf,
      SocketFactory socketFactory, int timeout) throws IOException {
    RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
        ProtobufRpcEngine.class);
    rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
        RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
        UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
  }

  这里第6行调用了RPC的getProxy方法来获取代理对象,这就是之前文档分析HDFS的RPC客户端获取代理对象的方法。所以这里与namenode的连接就是使用的RPC连接。

  因此,这里的健康检查的实现思路也很明确了,就是使用RPC连接到对应的NameNode,然后让该namenode执行具体的检查。

  这里具体远程调用的方法在上文提到的doHealthChecks方法中,该方法内容如下:

private void doHealthChecks() throws InterruptedException {
    while (shouldRun) {
      HAServiceStatus status = null;
      boolean healthy = false;
      try {
        status = proxy.getServiceStatus();
        proxy.monitorHealth();
        healthy = true;
      } catch (Throwable t) {
        if (isHealthCheckFailedException(t)) {
          LOG.warn("Service health check failed for " + targetToMonitor
              + ": " + t.getMessage());
          enterState(State.SERVICE_UNHEALTHY);
        } else {
          LOG.warn("Transport-level exception trying to monitor health of " +
              targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
          RPC.stopProxy(proxy);
          proxy = null;
          enterState(State.SERVICE_NOT_RESPONDING);
          Thread.sleep(sleepAfterDisconnectMillis);
          return;
        }
      }

      if (status != null) {
        setLastServiceStatus(status);
      }
      if (healthy) {
        enterState(State.SERVICE_HEALTHY);
      }

      Thread.sleep(checkIntervalMillis);
    }
  }

  首先是第6行和第7行,这里会远程调用两个方法:getServiceStatus方法和monitorHealth方法。若上述两个方法都执行成功,则代表namenode通过了健康检查,将healthy设置为True。然后是第29行,调用enterState方法,将其设置为健康状态。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/905732.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存