从文档(4)到文档(24),详细分析了namenode的启动流程。最后namenode会以standby模式启动。但在standby模式下的namenode是不会对外提供服务的,还必须要选择一台namenode将其转换为active模式,然后active的namenode对外提供服务。
这里将standby模式下的namenode转换为active模式,是由ZKFC来执行的。这里先从zkfc的启动开始分析。
与namenode相同,在bin目录下的hdfs文件中也记录了zkfc对应的java类。其内容如下:
这里实际调用的java类是DFSZKFailoverController类,这个类的main方法如下:
public static void main(String args[])
throws Exception {
if (DFSUtil.parseHelpArgument(args,
ZKFailoverController.USAGE, System.out, true)) {
System.exit(0);
}
GenericOptionsParser parser = new GenericOptionsParser(
new HdfsConfiguration(), args);
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
parser.getConfiguration());
int retCode = 0;
try {
retCode = zkfc.run(parser.getRemainingArgs());
} catch (Throwable t) {
LOG.fatal("Got a fatal error, exiting now", t);
}
System.exit(retCode);
}
这里和namenode的main方法相似,首先是解析传入的参数(第3行到第9行),然后创建一个zkfc对象(第10行),最后调用其run方法将服务启动(第14行)。
这里创建zkfc对象的create方法内容如下:
public static DFSZKFailoverController create(Configuration conf) {
Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
throw new HadoopIllegalArgumentException(
"HA is not enabled for this namenode.");
}
String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
if (nnId == null) {
String msg = "Could not get the namenode ID of this node. " +
"You may run zkfc on the node other than namenode.";
throw new HadoopIllegalArgumentException(msg);
}
NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);
NNHAServiceTarget localTarget = new NNHAServiceTarget(
localNNConf, nsId, nnId);
return new DFSZKFailoverController(localNNConf, localTarget);
}
这里的重点在最后一行,直接创建了一个DFSZKFailoverController对象,其构造方法如下:
private DFSZKFailoverController(Configuration conf,
NNHAServiceTarget localTarget) {
super(conf, localTarget);
this.localNNTarget = localTarget;
// Setup ACLs
adminAcl = new AccessControlList(
conf.get(DFSConfigKeys.DFS_ADMIN, " "));
LOG.info("Failover controller configured for NameNode " +
localTarget);
}
这个方法主要是一些赋值 *** 作。
然后再分析这个类的run方法,其内容如下:
public int run(final String[] args) throws Exception {
if (!localTarget.isAutoFailoverEnabled()) {
LOG.fatal("Automatic failover is not enabled for " + localTarget + "." +
" Please ensure that automatic failover is enabled in the " +
"configuration before running the ZK failover controller.");
return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
}
loginAsFCUser();
try {
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
return doRun(args);
} catch (Exception t) {
throw new RuntimeException(t);
} finally {
if (elector != null) {
elector.terminateConnection();
}
}
}
});
} catch (RuntimeException rte) {
throw (Exception)rte.getCause();
}
}
这个方法的重点就一行,即第14行的doRun方法,这个方法的内容如下:
private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
try {
initZK();
} catch (KeeperException ke) {
LOG.fatal("Unable to start failover controller. Unable to connect "
+ "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
+ "configured value for " + ZK_QUORUM_KEY + " and ensure that "
+ "ZooKeeper is running.");
return ERR_CODE_NO_ZK;
}
if (args.length > 0) {
if ("-formatZK".equals(args[0])) {
boolean force = false;
boolean interactive = true;
for (int i = 1; i < args.length; i++) {
if ("-force".equals(args[i])) {
force = true;
} else if ("-nonInteractive".equals(args[i])) {
interactive = false;
} else {
badArg(args[i]);
}
}
return formatZK(force, interactive);
} else {
badArg(args[0]);
}
}
if (!elector.parentZNodeExists()) {
LOG.fatal("Unable to start failover controller. "
+ "Parent znode does not exist.\n"
+ "Run with -formatZK flag to initialize ZooKeeper.");
return ERR_CODE_NO_PARENT_ZNODE;
}
try {
localTarget.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
"You must configure a fencing method before using automatic " +
"failover.", e);
return ERR_CODE_NO_FENCER;
}
initRPC();
initHM();
startRPC();
try {
mainLoop();
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}
首先第一个重点在于第4行的initZK方法,这个方法会创建一个zookeeper的客户端连接到zookeeper。然后是第12行到第29行处理格式化zk的请求。然后是第31行到第45行,做一些参数检查的 *** 作。然后是第二个重点第47行到第49行,初始化RPC和HM,并启动RPC。RPC服务在之前的文档中分析过了,这里更重要的是HM,它是healthMonitor的缩写。最后是第51行的mainLoop方法阻塞等待退出。
首先分析连接zookeeper的initZK方法,其内容如下:
private void initZK() throws HadoopIllegalArgumentException, IOException,
KeeperException {
zkQuorum = conf.get(ZK_QUORUM_KEY);
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
ZK_SESSION_TIMEOUT_DEFAULT);
// Parse ACLs from configuration.
String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
if (zkAcls.isEmpty()) {
zkAcls = Ids.CREATOR_ALL_ACL;
}
// Parse authentication from configuration.
String zkAuthConf = conf.get(ZK_AUTH_KEY);
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
List<ZKAuthInfo> zkAuths;
if (zkAuthConf != null) {
zkAuths = ZKUtil.parseAuth(zkAuthConf);
} else {
zkAuths = Collections.emptyList();
}
// Sanity check configuration.
Preconditions.checkArgument(zkQuorum != null,
"Missing required configuration '%s' for ZooKeeper quorum",
ZK_QUORUM_KEY);
Preconditions.checkArgument(zkTimeout > 0,
"Invalid ZK session timeout %s", zkTimeout);
int maxRetryNum = conf.getInt(
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, getParentZnode(), zkAcls, zkAuths,
new ElectorCallbacks(), maxRetryNum);
}
这个方法法会获取一系列参数,然后根据这些参数来创建一个ActiveStandbyElector类的对象(第34行)。这个对象的构造方法如下:
public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
HadoopIllegalArgumentException, KeeperException {
if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
throw new HadoopIllegalArgumentException("Invalid argument");
}
zkHostPort = zookeeperHostPorts;
zkSessionTimeout = zookeeperSessionTimeout;
zkAcl = acl;
zkAuthInfo = authInfo;
appClient = app;
znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
this.maxRetryNum = maxRetryNum;
// createConnection for future API calls
createConnection();
}
这里主要是对传入的参数进行赋值,重点是第21行调用的createConnection方法会创建连接zookeeper的客户端。其内容如下:
private void createConnection() throws IOException, KeeperException {
if (zkClient != null) {
try {
zkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing ZK",
e);
}
zkClient = null;
watcher = null;
}
zkClient = connectToZooKeeper();
LOG.debug("Created new connection for " + this);
}
这里首先会检查zkClient是否为空,若不为空则先关闭zkClient,然后再调用connectToZooKeeper方法重新创建。这个方法的内容如下:
protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
KeeperException {
// Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
// may trigger the Connected event immediately. So, if we register the
// watcher after constructing ZooKeeper, we may miss that event. Instead,
// we construct the watcher first, and have it block any events it receives
// before we can set its ZooKeeper reference.
watcher = new WatcherWithClientRef();
ZooKeeper zk = createZooKeeper();
watcher.setZooKeeperRef(zk);
// Wait for the asynchronous success/failure. This may throw an exception
// if we don't connect within the session timeout.
watcher.waitForZKConnectionEvent(zkSessionTimeout);
for (ZKAuthInfo auth : zkAuthInfo) {
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zk;
}
这里主要创建了两个对象:watcher和zk。zk是zookeeper的客户端,watcher用来监控和处理zookeeper的事件。
在zookeeper的初始化完成后,zkfc的下一个重点是初始化HM,调用的initHM方法内容如下:
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start();
}
首先是第2行创建了HealthMonitor对象,然后是第3行为HealthMonitor设置了一个HealthCallbacks对象。然后是第4行设置了一个ServiceStateCallBacks对象。最后是第5行启动HealthMonitor。
HealthMonitor的构造方法如下:
HealthMonitor(Configuration conf, HAServiceTarget target) {
this.targetToMonitor = target;
this.conf = conf;
this.sleepAfterDisconnectMillis = conf.getLong(
HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT);
this.checkIntervalMillis = conf.getLong(
HA_HM_CHECK_INTERVAL_KEY,
HA_HM_CHECK_INTERVAL_DEFAULT);
this.connectRetryInterval = conf.getLong(
HA_HM_CONNECT_RETRY_INTERVAL_KEY,
HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
this.rpcTimeout = conf.getInt(
HA_HM_RPC_TIMEOUT_KEY,
HA_HM_RPC_TIMEOUT_DEFAULT);
this.daemon = new MonitorDaemon();
}
这里主要是一些赋值 *** 作,其中的重点在最后一行的daemon参数,这里创建的是一个MonitorDaemon对象,这个daemon类是继承了Thead类的,是一个线程类。
然后是两个设置回调函数的方法,其内容如下:
public void addCallback(Callback cb) {
this.callbacks.add(cb);
}
public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
this.serviceStateCallbacks.add(cb);
}
设置回调函数都是将传入的对象添加到对应的回调函数列表中。
最后是其启动方法,内容如下:
void start() {
daemon.start();
}
上文提到了daemon是一个线程类,这里的start方法是线程的启动方法。其run方法内容如下:
@Override
public void run() {
while (shouldRun) {
try {
loopUntilConnected();
doHealthChecks();
} catch (InterruptedException ie) {
Preconditions.checkState(!shouldRun,
"Interrupted but still supposed to run");
}
}
}
}
这里的重点是第5行的loopUntilConnected方法和第6行的doHealthChecks方法。这个线程是用来对namenode进行健康检查的线程,只有通过了健康检查的namenode才会加入选举。这里的loopUntilConnected方法是用来阻塞等待链接namenode的方法,doHealthChecks方法是用来执行健康检查的方法。
其中loopUntilConnected方法内容如下:
private void loopUntilConnected() throws InterruptedException {
tryConnect();
while (proxy == null) {
Thread.sleep(connectRetryInterval);
tryConnect();
}
assert proxy != null;
}
这里很简单,就是使用一个while循环一直执行tryConnect方法。这个方法的内容如下:
private void tryConnect() {
Preconditions.checkState(proxy == null);
try {
synchronized (this) {
proxy = createProxy();
}
} catch (IOException e) {
LOG.warn("Could not connect to local service at " + targetToMonitor +
": " + e.getMessage());
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
}
}
这个方法也很简单,主要是第6行的createProxy方法来创建代理对象。这个方法内容如下:
protected HAServiceProtocol createProxy() throws IOException {
return targetToMonitor.getProxy(conf, rpcTimeout);
}
上述的getProxy方法内容如下:
public HAServiceProtocol getProxy(Configuration conf, int timeoutMs)
throws IOException {
Configuration confCopy = new Configuration(conf);
// Lower the timeout so we quickly fail to connect
confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
return new HAServiceProtocolClientSideTranslatorPB(
getAddress(),
confCopy, factory, timeoutMs);
}
这里最终创建的是一个HAServiceProtocolClientSideTranslatorPB类的对象,这个类的构造方法如下:
public HAServiceProtocolClientSideTranslatorPB(
InetSocketAddress addr, Configuration conf,
SocketFactory socketFactory, int timeout) throws IOException {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
}
这里第6行调用了RPC的getProxy方法来获取代理对象,这就是之前文档分析HDFS的RPC客户端获取代理对象的方法。所以这里与namenode的连接就是使用的RPC连接。
因此,这里的健康检查的实现思路也很明确了,就是使用RPC连接到对应的NameNode,然后让该namenode执行具体的检查。
这里具体远程调用的方法在上文提到的doHealthChecks方法中,该方法内容如下:
private void doHealthChecks() throws InterruptedException {
while (shouldRun) {
HAServiceStatus status = null;
boolean healthy = false;
try {
status = proxy.getServiceStatus();
proxy.monitorHealth();
healthy = true;
} catch (Throwable t) {
if (isHealthCheckFailedException(t)) {
LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + t.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} else {
LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
}
}
if (status != null) {
setLastServiceStatus(status);
}
if (healthy) {
enterState(State.SERVICE_HEALTHY);
}
Thread.sleep(checkIntervalMillis);
}
}
首先是第6行和第7行,这里会远程调用两个方法:getServiceStatus方法和monitorHealth方法。若上述两个方法都执行成功,则代表namenode通过了健康检查,将healthy设置为True。然后是第29行,调用enterState方法,将其设置为健康状态。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)