- zk 中的数据模型,是一棵树,DataTree,每个节点,叫做 DataNode
- zk 集群中的 DataTree 时刻保持状态同步
- Zookeeper 集群中每个 zk 节点中,数据在内存和磁盘中都有一份完整的数据。
- 内存数据:DataTree
- 磁盘数据:快照文件 + 编辑日志
1) 启动集群
QuorumPeerMain类
public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(2); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(3); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(4); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(1); } LOG.info("Exiting normally"); System.exit(0); }
initializeAndRun
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 集群启动 runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
runFromConfig方法
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; ...... quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); quorumPeer.start(); quorumPeer.join();
2) 冷启动恢复数据
start()
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // 冷启动数据恢复 loadDatabase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); super.start(); }
private void loadDatabase() { try { // 加载磁盘数据到内存,恢复 DataTree // zk 的 *** 作分两种:事务 *** 作和非事务 *** 作 // 事务 *** 作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位: (前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id) // 非事务 *** 作:zk.getData() // 数据恢复过程: // (1)从快照文件中恢复大部分数据,并得到一个 lastProcessZXid // (2)再从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid // (3)最终得到,datatree 和 lastProcessZXid,表示数据恢复完成 zkDb.loadDatabase(); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); ......
loadDatabase()
public long loadDatabase() throws IOException { long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; }
restore()
public long restore(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException { // 恢复 快照文件到 数据到 DataTree long deserializeResult = snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); RestoreFinalizer finalizer = () -> { // 恢复 编辑日志到 数据到 DataTree long highestZxid = fastForwardFromEdits(dt, sessions, listener); return highestZxid; if (-1L == deserializeResult) { if (txnLog.getLastLoggedZxid() != -1) { // ZOOKEEPER-3056: provides an escape hatch for users upgrading // from old versions of zookeeper (3.4.x, pre 3.5.3). if (!trustEmptySnapshot) { throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!"); } else { LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING); return finalizer.run(); } } save(dt, (ConcurrentHashMap )sessions); return 0; }
deserialize()
ctrl + alt +B 查找 deserialize 实现类 FileSnap.java
public long deserialize(DataTree dt, Mapsessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; // 依次遍历每一个快照的数据 for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot " + snap); try ( // 反序列化 环境准备 InputStream snapIS = new BufferedInputStream(new FileInputStream(snap)); CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) { InputArchive ia = BinaryInputArchive.getArchive(crcIn); // 反序列到 化,恢复数据到 DataTree deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file " + snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
deserialize()
public void deserialize(DataTree dt, Mapsessions, InputArchive ia) throws IOException { FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); } // 恢复快照数据到 DataTree SerializeUtils.deserializeSnapshot(dt,ia,sessions); }
deserializeSnapshot()
public static void deserializeSnapshot(DataTree dt,InputArchive ia, Mapsessions) throws IOException { int count = ia.readInt("count"); while (count > 0) { long id = ia.readLong("id"); int to = ia.readInt("timeout"); sessions.put(id, to); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "loadData --- session in archive: " + id + " with timeout: " + to); } count--; } // 恢复快照数据到 DataTree dt.deserialize(ia, "tree"); }
public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); String path = ia.readString("path"); // 从快照中恢复每一个 datanode 节点数据到 DataTree while (!"/".equals(path)) { // 每次循环创建一个节点对象 DataNode node = new DataNode(); ia.readRecord(node, "node"); // 将 将 DataNode 恢复到 DataTree nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { // 处理父节点 String parentPath = path.substring(0, lastSlash); DataNode parent = nodes.get(parentPath); if (parent == null) { throw new IOException("Invalid Datatree, unable to find parent " + parentPath + " of path " + path); } // 处理子节点 parent.addChild(path.substring(lastSlash + 1)); // 处理临时节点和永久节点 long eowner = node.stat.getEphemeralOwner(); EphemeralType ephemeralType = EphemeralType.get(eowner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (eowner != 0) { HashSet16.3、冷启动数据恢复编辑日志list = ephemerals.get(eowner); if (list == null) { list = new HashSet (); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } nodes.put("/", root); // we are done with deserializing the // the datatree // update the quotas - create path trie // and also update the stat nodes setupQuota(); aclCache.purgeUnused(); }
restore()
public long restore(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException { // 恢复快照文件数据到 DataTree long deserializeResult = snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); RestoreFinalizer finalizer = () -> { // 恢复 编辑日志到 数据到 DataTree long highestZxid = fastForwardFromEdits(dt, sessions, listener); return highestZxid; }; ...... return finalizer.run(); }
fastForwardFromEdits()
public long fastForwardFromEdits(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException { // 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1位置开始恢复 TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); // 快照中最大的 zxid ,在执行编辑日志时,这个值会不断更新,直到所有 *** 作执行完 long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { // 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据 while (true) { // iterator points to // the first valid txn when initialized // 获取事务头信息(有 zxid ) hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { // 根据编辑日志恢复数据到 DataTree, , 每务 执行一次,对应的事务 id,highestZxid + 1 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }
processTransaction()
public void processTransaction(TxnHeader hdr,DataTree dt, Mapsessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid // 创建节点、删除节点和其他的各种事务 *** 作等 rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } if (rc.err != Code.OK.intValue()) { LOG.debug("Ignoring processTxn failure hdr: {}, error: {}, path: {}",hdr.getType(), rc.err, rc.path); } }
processTxn()
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime(), null); break; case OpCode.create2: CreateTxn create2Txn = (CreateTxn) txn; rc.path = create2Txn.getPath(); Stat stat = new Stat(); createNode( create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(), create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(), header.getZxid(), header.getTime(), stat); rc.stat = stat; break; case OpCode.createTTL: CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn; rc.path = createTtlTxn.getPath(); stat = new Stat(); createNode( createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(), EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat); rc.stat = stat; ......17、Zookeeper选举源码解析 17.1、Zookeeper选举工作机制 17.1.1、第一次启动 17.1.2、非第一次启动 17.2、Zookeeper选举源码解析图解
选举源码解析组件图
选举源码解析流程图
QuorumPeerMain类
public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(2); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(3); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(4); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(1); } LOG.info("Exiting normally"); System.exit(0); }
initializeAndRun方法
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
runFromConfig
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); ...... quorumPeer.initialize(); quorumPeer.start(); quorumPeer.join();
start方法
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDatabase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } // 开始领导选举 startLeaderElection(); super.start(); }
startLeaderElection()
synchronized public void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { // 创建选票 // (1 )选票组件:epoch (leader 的任期代号)、zxid (某个 leader 当选期间执行的事务编号)、myid (serverid ) // (2 )开始选票时,都是先投自己 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } // if (!getView().containsKey(myid)) { // throw new RuntimeException("My id " + myid + " not in the peer list"); // } if (electionType == 0) { try { udpSocket = new DatagramSocket(getQuorumAddress().getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } // 创建选举算法实例 this.electionAlg = createElectionAlgorithm(electionType); }
createElectionAlgorithm()
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: // 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信 QuorumCnxManager qcm = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); if (oldQcm != null) { LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); oldQcm.halt(); } QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ // 2 启动监听线程 listener.start(); // 3 准备开始选举 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
createCnxnManager()
public QuorumCnxManager(QuorumPeer self, final long mySid, Mapview, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) { // 创建各种队列 this.recvQueue = new ArrayBlockingQueue (RECV_CAPACITY); this.queueSendMap = new ConcurrentHashMap >(); this.senderWorkerMap = new ConcurrentHashMap (); this.lastMessageSent = new ConcurrentHashMap (); String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); } this.self = self; this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenonAllIPs = listenOnAllIPs; initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); // Starts listener thread that waits for connection requests listener = new Listener(); listener.setName("QuorumPeerListener"); }
listener.start(),点击 QuorumCnxManager.Listener,找到对应的 run 方法
public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; Exception exitException = null; ...... LOG.info("My election bind port: " + addr.toString()); setName(addr.toString()); // 绑定服务器地址 ss.bind(addr); // 死循环 while (!shutdown) { try { client = ss.accept(); setSockOpts(client); ..... } }
FastLeaderElection(this, qcm)
private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; // 初始化队列和信息 sendqueue = new linkedBlockingQueue17.3、zookeeper选举执行源码(); recvqueue = new linkedBlockingQueue (); this.messenger = new Messenger(manager); }
QuorumPeer.java
public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // 冷启动数据恢复 loadDatabase(); startServerCnxnFactory(); try { // 启动通信工厂实例对象 adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } // 准备选举环境 startLeaderElection(); // 执行选举 super.start(); }17.3.1、执行 super.start()
就相当于执行 QuorumPeer.java 类中的 run()方法当 Zookeeper 启动后,首先都是 Looking 状态,通过选举,让其中一台服务器成为 Leader,其他的服务器成为 Follower。
{ updateThreadName(); LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for(QuorumServer s: getView().values()){ ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(this, s); try { MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); jmxRemotePeerBean.put(s.id, rBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } // 进行选举,选举结束,返回最终成为 Leader 胜选的那张选票 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); updateServerState(); } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } start_fle = Time.currentElapsedTime(); } } finally { LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(jmxQuorumBean); instance.unregister(jmxLocalPeerBean); for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { instance.unregister(remotePeerBean); } jmxQuorumBean = null; jmxLocalPeerBean = null; jmxRemotePeerBean = null; } }17.3.2、setCurrentVote(makeLEStrategy().lookForLeader());
ctrl+alt+b 点击 lookForLeader()的实现类 FastLeaderElection.java
public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = Time.currentElapsedTime(); } try { // 正常启动中,所有其他服务器,都会给我发送一个投票 // 保存每一个服务器的最新合法有效的投票 HashMap17.3.3、 sendNotifications();recvset = new HashMap (); // 存储合法选举之外的投票结果 HashMap outofelection = new HashMap (); // 一次选举的最大等待时间,默认值是 0.2s int notTimeout = finalizeWait; // 每发起一轮选举,logicalclock++ // 在没有合法的 epoch 数据之前,都使用逻辑时钟代替 // 选举 leader 的规则:依次比较 epoch(任期) zxid(事务 id) serverid (myid) 谁大谁当选 leader synchronized(this){ // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟 logicalclock.incrementAndGet(); // 更新选票(serverid, zxid, epoch), updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); // 广播选票,把自己的选票发给其他服务器 sendNotifications(); // 一轮一轮的选举直到选举成功 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ ...... return null; } finally { ...... } }
广播选票,把自己的选票发给其他服务器
private void sendNotifications() { // 遍历投票参与者,给每台服务器发送选票 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); // 创建发送选票 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } // 把发送选票放入发送队列 sendqueue.offer(notmsg); } }17.3.4、在 FastLeaderElection.java 类中查找 WorkerSender 线程
{ volatile boolean stop; QuorumCnxManager manager; WorkerSender(QuorumCnxManager manager){ super("WorkerSender"); this.stop = false; this.manager = manager; } public void run() { while (!stop) { try { // 队列阻塞,时刻准备接收要发送的选票 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; // 处理要发送的选票 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); // 发送选票 manager.toSend(m.sid, requestBuffer); } }
toSend(m.sid, requestBuffer)
public void toSend(Long sid, ByteBuffer b) { // 判断如果是发给自己的消息,直接进入自己的 RecvQueue if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); } else { // 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列 // ,并把要发送的消息放入该队列 ArrayBlockingQueue17.3.5、addToRecvQueue(new Message(b.duplicate(), sid));bq = new ArrayBlockingQueue ( SEND_CAPACITY); ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq); if (oldq != null) { addToSendQueue(oldq, b); } else { addToSendQueue(bq, b); } // 将选票发送出去 connectOne(sid); } }
如果数据是发送给自己的,添加到自己的接收队列
public void addToRecvQueue(Message msg) { synchronized(recvQLock) { if (recvQueue.remainingCapacity() == 0) { try { recvQueue.remove(); } catch (NoSuchElementException ne) { // element could be removed by poll() LOG.debug("Trying to remove from an empty " + "recvQueue. Ignoring exception " + ne); } } try { // 将发送给自己的选票添加到 recvQueue 队列 recvQueue.add(msg); } catch (IllegalStateException ie) { // This should never happen LOG.error("Unable to insert element in the recvQueue " + ie); } } }17.3.6、addToSendQueue(oldq, b);
数据添加到发送队列
private void addToSendQueue(ArrayBlockingQueue17.3.7、 connectOne(long sid)queue, ByteBuffer buffer) { if (queue.remainingCapacity() == 0) { try { queue.remove(); } catch (NoSuchElementException ne) { // element could be removed by poll() LOG.debug("Trying to remove from an empty " + "Queue. Ignoring exception " + ne); } } try { // 将要发送的消息添加到发送队列 queue.add(buffer); } catch (IllegalStateException ie) { // This should never happen LOG.error("Unable to insert an element in the queue " + ie); } }
与要发送的服务器节点建立通信连接
synchronized void connectOne(long sid){ if (senderWorkerMap.get(sid) != null) { LOG.debug("There is a connection already for server " + sid); return; } synchronized (self.QV_LOCK) { boolean knownId = false; // Resolve hostname for the remote server before attempting to // connect in case the underlying ip address has changed. self.recreateSocketAddresses(sid); Map17.3.8、startConnection(sock, sid);lastCommittedView = self.getView(); QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier(); Map lastProposedView = lastSeenQV.getAllMembers(); if (lastCommittedView.containsKey(sid)) { knownId = true; if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) return; } if (lastSeenQV != null && lastProposedView.containsKey(sid) && (!knownId || (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) { knownId = true; if (connectOne(sid, lastProposedView.get(sid).electionAddr)) return; } if (!knownId) { LOG.warn("Invalid server id: " + sid); return; } } }
创建并启动发送器线程和接收器线程
private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { // Use BufferedOutputStream to reduce the number of IP packets. This is // important for x-DC scenarios. // 通过输出流,向服务器发送数据 BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); dout = new DataOutputStream(buf); // Sending id and challenge // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); String addr = formatInetAddr(self.getElectionAddress()); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); dout.flush(); // 通过输入流读取对方发送过来的选票 din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; } // authenticate learner QuorumPeer.QuorumServer qps = self.getVotingView().get(sid); if (qps != null) { // TODO - investigate why reconfig makes qps null. authLearner.authenticate(sock, qps.hostname); } // If lost the challenge, then drop the new connection // 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + self.getId() + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { // 初始化,发送器 和 接收器 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue17.3.9、sw.start();( SEND_CAPACITY)); // 启动发送器线程和接收器线程 sw.start(); rw.start(); return true; } return false; }
点击 SendWorker,并查找该类下的 run 方法
public void run() { threadCnt.incrementAndGet(); try { ArrayBlockingQueuebq = queueSendMap.get(sid); if (bq == null || isSendQueueEmpty(bq)) { ByteBuffer b = lastMessageSent.get(sid); if (b != null) { LOG.debug("Attempting to send lastMessage to sid=" + sid); send(b); } } } catch (IOException e) { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } try { // 只要连接没有断开 while (running && !shutdown && sock != null) { ByteBuffer b = null; try { ArrayBlockingQueue bq = queueSendMap .get(sid); if (bq != null) { // 不断从发送队列 SendQueue 中,获取发送消息,并执行发送 b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { LOG.error("No queue of incoming messages for " + "server " + sid); break; } if(b != null){ // 更新对于 sid 这台服务器的最近一条消息 lastMessageSent.put(sid, b); // 执行发送 send(b); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for message on queue", e); } } } catch (Exception e) { LOG.warn("Exception when using channel: for id " + sid + " my id = " + QuorumCnxManager.this.mySid + " error = " + e); } this.finish(); LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId()); } }
send(b)
synchronized void send(ByteBuffer b) throws IOException { byte[] msgBytes = new byte[b.capacity()]; try { b.position(0); b.get(msgBytes); } catch (BufferUnderflowException be) { LOG.error("BufferUnderflowException ", be); return; } // 输出流向外发送 dout.writeInt(b.capacity()); dout.write(b.array()); dout.flush(); }17.3.10、 rw.start();
点击 RecvWorker,并查找该类下的 run 方法
public void run() { threadCnt.incrementAndGet(); try { // 只要连接没有断开 while (running && !shutdown && sock != null) { int length = din.readInt(); if (length <= 0 || length > PACKETMAXSIZE) { throw new IOException( "Received packet with invalid packet: " + length); } byte[] msgArray = new byte[length]; // 输入流接收消息 din.readFully(msgArray, 0, length); ByteBuffer message = ByteBuffer.wrap(msgArray); // 接收对方发送过来的选票 addToRecvQueue(new Message(message.duplicate(), sid)); } } catch (Exception e) { LOG.warn("Connection broken for id " + sid + ", my id = " + QuorumCnxManager.this.mySid + ", error = " , e); } finally { LOG.warn("Interrupting SendWorker"); sw.finish(); closeSocket(sock); } }
addToRecvQueue(Message msg)
public void addToRecvQueue(Message msg) { synchronized(recvQLock) { if (recvQueue.remainingCapacity() == 0) { try { recvQueue.remove(); } catch (NoSuchElementException ne) { // element could be removed by poll() LOG.debug("Trying to remove from an empty " + "recvQueue. Ignoring exception " + ne); } } try { // 将接收到的消息,放入接收消息队列 recvQueue.add(msg); } catch (IllegalStateException ie) { // This should never happen LOG.error("Unable to insert element in the recvQueue " + ie); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)