如果你编辑zkServer.cmd文件就会发现ZooKeeper的启动类是QuorumPeerMain实例,既然是启动类,必然存在一个main方法。如下:
public static void main(String[] args) { QuorumPeerMainmain = new QuorumPeerMain(); main.initializeAndRun(args); }
main方法通过调用QuorumPeerMainmain的initializeAndRun实现ZooKeeper启动,这也是我们分析ZooKeeper源码的入口。
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { //解析配置文件,参数传递的应是配置文件也就是zoo.cfg QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } //这里先不用管,后续在分析(大致是清理多余的数据日志) DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //此时就会判断,如果是集群启动 if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { //否则走单机启动 ZooKeeperServerMain.main(args); } }
这里就分为单机启动和集群启动。
二、单机启动单机启动是通过ZooKeeperServerMain的main方法启动的,我们也可以直接配置其为启动类(单机需求下),他们都是调用initializeAndRun方法,单机模式下的配置文件解析对象是ServerConfig(后文单独分析配置文件的解析过程)。接下来就是调用runFromConfig(config)启动ZooKeeper。
public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { FileTxnSnapLog txnLog = null; try { try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } ServerMetrics.metricsProviderInitialized(metricsProvider); //注册一些权限认证信息 ProviderRegistry.initialize(); //定义一个日志快照对象 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); //默认是是空 JvmPauseMonitor jvmPauseMonitor = null; if (config.jvmPauseMonitorToRun) { jvmPauseMonitor = new JvmPauseMonitor(config); } //实例化ZooKeeper舒服对象 final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig); //设置服务状态 txnLog.setServerStats(zkServer.serverStats()); //用来等待ZooKeeper暂停 final CountDownLatch shutdownLatch = new CountDownLatch(1); //注册服务关闭处理器 zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch)); //创建一个管理服务 adminServer = AdminServerFactory.createAdminServer(); //赋值ZooKeeper对象 adminServer.setZooKeeperServer(zkServer); //启动管理服务(默认端口是8080),可以通过zookeeper.admin.serverPort进行端口配置,后续再做分析 adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { //服务启动工厂默认是NIOServerCnxnFactory也可以通过参数zookeeper.serverCnxnFactory配置 cnxnFactory = ServerCnxnFactory.createFactory(); //设置配置信息 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); //启动服务 cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } //容器管理对象 containerManager = new ContainerManager( zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000), Long.getLong("znode.container.maxNeverUsedIntervalMs", 0) ); //启动容器管理 containerManager.start(); ZKAuditProvider.addZKStartStopAuditLog(); //线程阻塞在这里,服务关闭 shutdownLatch.await(); shutdown(); if (cnxnFactory != null) { //等待服务线程执行完毕 cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.canShutdown()) { //完全关闭 zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } }
单从启动类看,启动逻辑简单,读取配置文件、启动管理者后台、启动ZooKeeper服务,使用CountDownLatch,使线程阻塞直到服务停止。
三、集群启动集群模式下配置文件的解析是通过QuorumPeerConfig对象实现,之后也是调用runFromConfig(config)方法启动。
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { final MetricsProvider metricsProvider; try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } try { ServerMetrics.metricsProviderInitialized(metricsProvider); ProviderRegistry.initialize(); ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; //创建启动工厂 if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } //集群启动是通过QuorumPeer实例实现,如下都是设置配置信息 quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled()); quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled()); quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()) { quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); } //启动服务 quorumPeer.start(); ZKAuditProvider.addZKStartStopAuditLog(); //线程阻塞直到服务关闭 quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } finally { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } }
集群模式下是交给了QuorumPeer对象启动。
四、总结不管是单机还是集群,以上都只是简单介绍了启动入口方法,实际的启动过程、以及底层的工作原理还需进一步分析,后续我们将从单机启动和集群启动来分别分析。
以上,有任何不对的地方,请留言指正,敬请谅解。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)