该阶段主要是leader同步数据给follower,具体见源码分析。
2.关键对象Zxid:每一个事务id或者newEpochZxid都可以用Zxid表示,事务id的组成为(epoch|counter),epoch表示发送该事务的leader的currentEpoch,counter从1开始,该leader没发送一个事务就会自增该值;高32位为epoch,低32位为counter。Zxid规定了事务的total order
newEpochZxid:组成为(epoch|0),是一条特殊的Zxid,仅仅用来表示一个newEpoch。
peerLastZxid:该变量表示待同步节点(follower/observer)的包含的最后一条事务log的zxid。
commitedLog:保存在内存中,保存已提交的事务log;maxCommittedLog,minCommittedLog分别表示该对象保存的最大和最小的事务zxid。
lastProcessedZxid:表示节点最后一条事务zxid,保存在内存中。
preProposalZxid:表示leader节点的磁盘事务log或者内存committedLog中小于peerLastZxid的最大的事务Zxid;用于在TRUNC同步方式中,通知待同步节点删除其磁盘log中大于preProposalZxid的事务log。
queuePacket:发送队列,数据同步阶段leader会将其DIFF/TRUNC/SNAP/COMMIT/ACK…消息,和数据消息(类型是PROPOSAL)放到该队列中。
currentEpoch:表示每一个节点的当前epoch,保存在磁盘中,数据同步结束后会更新该变量(这个机制是保证zk集群安全性的关键);该变量用于leader选举阶段,候选者比较的epoch就是该变量,更新于数据同步后。
acceptEpoch:表示每一个节点看到的最大的epoch,保存在磁盘中。
3.关键类和方法 1.leader节点运行的方法 Leader.java//Leader.java zk3.7.0 //选举完成后leader会执行该方法。 void lead() throws IOException, InterruptedException { // ...... try { // Start thread that waits for connection requests from // new followers. cnxAcceptor = new LearnerCnxAcceptor(); //开启一个thread进行数据同步,该方法的实际代码见LearnerHandler.java#run() cnxAcceptor.start(); // ...... // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get // acknowledged //等待quorum的节点ack了NEWLEADER消息。 waitForEpochAck(self.getId(), leaderStateSummary); //结束。数据同步完成后,修改leader的当前epoch。 self.setCurrentEpoch(epoch); //......后面就是为集群提供正常服务。 }LearnerHandler.java
//该线程就是leader节点正常处理数据同步的代码,每一个节点,leader都会启动这样一个线程处理其数据同步。 public void run() { try { // ...... //-----------------------------------------------------接收FOLLOWERINFO/OBSERVERINFO----------------- //2。下面2行,接受follower或者obserer发送的消息FOLLOWERINFO(OBSERVERINFO),qp对象即消息对象 QuorumPacket qp = new QuorumPacket(); //读取一条其他节点发来的消息 ia.readRecord(qp, "packet"); //如果该消息不是一条FOLLOWERINFO/OBSERVERINFO消息,则报错并返回;即此时leader接受的第一条消息只能是 //FOLLOWERINFO/OBSERVERINFO消息 if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) { LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString()); return; } // ...... //2。根据消息的zxid,得出待同步节点的lastAcceptedEpoch long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); //该数据主要用来表示待同步节点的最后一条处理的事务zxid。 long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); //2。the first zxid of the next epoch long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); //2。根据newEpoch获得新leader的newLeaderZxid(该zxid不用于标记事务log),事务zxid都是由(epoch|1->n) //即事务zxid的后32位不为0,而newLeaderZxid为(epoch|0),只是用来表示一个newEpoch,不要来标记事务。 long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); //--------------------------------------------------------发送LEADERINFO--------------------------------- //2。leader 向待同步节点发送(包含newEpoch的消息)LEADERINFO消息 QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); // ...... //--------------------------------------------------------接收ACKEPOCH------------------------------- // 构造一个接受ACKEPOCH消息的对象 QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); //如果收到的消息不是ACKEPOCH,则报错并返回。 if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString()); return; } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); //4。根据当前发送者节点,发送的ACKEPOCH消息,包装成一个StateSummary对象。 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); //4。wait for the leader of the new epoch to be confirmed by followers, //等待收到收到quorum的节点的ACKEPOCH learnerMaster.waitForEpochAck(this.getSid(), ss); //---------------------------------------------根据情况判断同步方式DIFF/SNAP/TRUNC-------------------- //获得follower/observer节点包含的最后的zxid peerLastZxid = ss.getLastZxid(); // 5。Take any necessary action if we need to send TRUNC or DIFF,开始同步 // startForwarding() will be called in all cases //执行同步的代码,LearnerHandler#syncFollower(),根据情况判断使用的同步方式 boolean needSnap = syncFollower(peerLastZxid, learnerMaster); // ...... //5。需要快照同步 if (needSnap) { // ...... try { //当前leader待发送的快照包括此leader处理过的最大的事务的zxid,并发送一条SNAP给待同步节点 long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid(); //向待发送队列中写入一条SNAP消息。 oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); // ...... // Dump data to peer,将该快照序列化后发送给待同步节点 learnerMaster.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } // ...... // the version of this quorumVerifier will be set by leader.lead() in case // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if // we got here, so the version was set //6。在待发送队列中添加NEWLEADER消息,并附带newLeaderZxid。 //此时待发送队列中有DIFF/TRUNC,待同步的事务消息,此时又加入了NEWLEADER消息。 //注意,还没有实际将数据发送给待同步节点。 if (getVersion() < 0x10000) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null); queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); // 6。Start thread that blast packets in the queue to learner,实际将待发送队列的消息依次发送给待同步节点。 startSendingPackets(); //---------------------------------------------等待ACK-------------------------------- qp = new QuorumPacket(); ia.readRecord(qp, "packet"); //收到的消息不是ACK,则退出 if (qp.getType() != Leader.ACK) { LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp)); return; } //等待编号为sid的待同步节点ack了NEWLEADER消息,该待同步节点与leader数据同步流程结束; //注意:此时集群中还有其他待同步节点,leader的同步阶段还没有结束。 learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid()); // ...... //---------------------------------------------发送uptodate给待同步阶段------------------------- // This message type is sent by the leader to indicate that the follower is // * now uptodate andt can start responding to clients. //此时该待同步节点的数据同步阶段已经完成,leader发送UPTODATE,给该节点;表示它可以响应客户端的请求。 queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); // ...... }
// LearnerHandler.java zk3.7.0 //5。数据同步 boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) { //该变量主要判断待同步节点的peerLastZxid是不是一个newEpochZxid。 boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; // Keep track of the latest zxid which already queued long currentZxid = peerLastZxid; boolean needSnap = true; // ...... try { // ...... //设置值,committedLog为内存结构,DataTree也是内存结构 long maxCommittedLog = db.getmaxCommittedLog(); long minCommittedLog = db.getminCommittedLog(); long lastProcessedZxid = db.getDataTreeLastProcessedZxid(); // ...... if (db.getCommittedLog().isEmpty()) { minCommittedLog = lastProcessedZxid; maxCommittedLog = lastProcessedZxid; } //注意在上面原文注释中的第4条,这种情况下也可能会使用SNAP方式同步。 if (forceSnapSync) { // Force learnerMaster to use snapshot to sync with follower,仅用于测试 LOG.warn("Forcing snapshot sync - should not see this in production"); } //如果lastProcessedZxid == peerLastZxid,即使用DIFF方式同步,这种情况其实并没有发送数据过去, //只是发了一条DIFF消息而已,即发送了一条空的DIFF //因此这种情况可以不考虑,没什么意义。 else if (lastProcessedZxid == peerLastZxid) { // Follower is already sync with us, send empty diff queueOpPacket(Leader.DIFF, peerLastZxid); needOpPacket = false; needSnap = false; } //如果peerLastZxid > maxCommittedLog大,并且peerLastZxid的确代表一条事务zxid,使用TRUNC方式同步。 else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) { // Newer than committedLog, send trunc and done //该方法的maxCommittedLog表示,节点接受到TRUNC消息后,要删除其磁盘事务log中zxid大于maxCommittedLog //的事务log queueOpPacket(Leader.TRUNC, maxCommittedLog); currentZxid = maxCommittedLog; needOpPacket = false; needSnap = false; } //如果peerLastZxid在minCommittedLog和maxCommittedLog之间 else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // Follower is within commitLog range //获取一个迭代器用于迭代内存中的commitedLog对象 Iteratoritr = db.getCommittedLog().iterator(); //此时也不一定会使用TRUNC或DIFF,还有可能使用SNAP, //看具体的该方法分析LearnerHandler#queueCommittedProposals, //刚进入该函数时,needOpPacket == true ********,下同 currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog); needSnap = false; } //如果peerLastZxid比minCommittedLog小,此时如果磁盘上的事务log可用, //那么就使用磁盘中的事务log和内存committedLog共同进行数据同步。 else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) { // Use txnlog and committedLog to sync,此时也不一定使用SNAP,也可能使用DIFF或者TRUNC方式。 // Calculate sizeLimit that we allow to retrieve txnlog from disk long sizeLimit = db.calculateTxnLogSizeLimit(); // This method can return empty iterator if the requested zxid // is older than on-disk txnlog 获取一个迭代器, //该迭代器可以从第一个 < peerLastZxid的log文件开始一个文件一个文件遍历里面的事务记录, //如果找不到满足条件的迭代器,就使用SNAP同步。 Iterator txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit); //如果迭代器中可以取出事务 if (txnLogItr.hasNext()) { LOG.info("Use txnlog and committedLog for peer sid: {}", getSid()); //返回待同步给follower的事务队列中的最后一条事务的zxid,最大为minCommittedLog currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog); //如果待发送给follower的事务进行同步的最大的zxid没有超过minCommittedLog,需要通过snapshot同步。 //快照同步的时候,首先把我们刚才待发送的同步事务清空。 if (currentZxid < minCommittedLog) { currentZxid = peerLastZxid; // Clear out currently queued requests and revert // to sending a snapshot. queuedPackets.clear(); //此时needOpPacket == true。******** needOpPacket = true; } else { //如果待发送队列中的的最大事务zxid等于了minCommittedLog, //则像处理peerLastZxid >=minCommitLog <=maxCommitedLog的情况一样处理。 Iterator committedLogItr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog); needSnap = false; } } } // ...... } //就上面代码执行后needSnap等于false,但是由于上面******处将needOpPacket设置为了true,因此该条件成立 //又会设置needSnap = true;因此仍然使用SNAP同步。 if (needOpPacket && !needSnap) { // This should never happen, but we should fall back to sending // snapshot just in case. LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot", getSid()); needSnap = true; } return needSnap; }
//上面的原文注释告诉我们,该方法会把leader节点上(peerLaxtZxid, maxZxid]范围的事务放到一个待发送队列中, //并返回待发送队列中最后一条事务的zxid,当然这是正常返回时的逻辑,该方法正常返回时,needOpPacket == false****** protected long queueCommittedProposals(Iterator2.follower节点运行的方法(observer也类似) follower.javaitr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {//5。数据同步 boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; long queuedZxid = peerLastZxid; // as we look through proposals, this variable keeps track of previous // proposal Id. long prevProposalZxid = -1; while (itr.hasNext()) { //从迭代器中获得下一个事务。 Proposal propose = itr.next(); //pacKetZxid: propose的Zxid long packetZxid = propose.packet.getZxid(); // abort if we hit the limit if ((maxZxid != null) && (packetZxid > maxZxid)) { break; } // skip the proposals the peer already has //跳过比peerLastZxid小的propose的Zxid。 if (packetZxid < peerLastZxid) { prevProposalZxid = packetZxid; continue; } //到这里说明:packetZxid >= peerLastZxid ; //prevProposalZxid为迭代器遍历的集合中小于peerLastZxid的最大的Zxid。 // If we are sending the first packet, figure out whether to trunc // or diff //此时needOpPacket = true***** if (needOpPacket) { // Send diff when we see the follower's zxid in our history, //如果能够在leader中找到peerLastZxid表示的事务,则直接发送DIFF消息 if (packetZxid == peerLastZxid) { //DIFF消息中,lastCommittedZxid表示将待同步节点同步到lastCommittedZxid。 queueOpPacket(Leader.DIFF, lastCommittedZxid); //执行完后该变量变为false needOpPacket = false; continue; } //那么此时可以判断出packetZxid > peerLastZxid,那么说明peerLastZxid表示的事务,该leadeer可能没有。 //如果该peerLastZxid是一条newEpochZxid,则不用理会,因为没有事务的zxid是这样的,正常发送DIFF消息即可。 if (isPeerNewEpochZxid) { // Send diff and fall through if zxid is of a new-epoch queueOpPacket(Leader.DIFF, lastCommittedZxid); needOpPacket = false; } //如果peerLastZxid确实是一条事务的zxid,那么说明leader没有这条事务 else if (packetZxid > peerLastZxid) { // it may used to be a leader, //如果两个zxid的epoch不相同,则不能使用TRUNC!!!. if (ZxidUtils.getEpochFromZxid(packetZxid) != ZxidUtils.getEpochFromZxid(peerLastZxid)) { // We cannot send TRUNC that cross epoch boundary. // The learner will crash if it is asked to do so. // We will send snapshot this those cases. LOG.warn("Cannot send TRUNC to peer sid: " + getSid() + " peer zxid is from different epoch"); //注意这个时候直接结束了该方法,此时queuedZxid = peerLastZxid; //并且needOpPacket = true********** return queuedZxid; } //如果epoch相同。 //发送TRUNC消息,preProposalZxid为该节点和leader共同拥有的最后一条事务的zxid, //即TRUNC消息通知待同步节点将其磁盘log删除preProposalZxid之后的。 queueOpPacket(Leader.TRUNC, prevProposalZxid); needOpPacket = false; } } // ....... // Since this is already a committed proposal, we need to follow // it by a commit packet //leader将follower没有的事务 propose给follower,此处开始真正的发送同步数据。 queuePacket(propose.packet); //propose后,附带发送一个commit消息,因为数据同步阶段的事务,肯定早已被提交了, //因此,此时可以直接在propose后,附带commit消息 queueOpPacket(Leader.COMMIT, packetZxid); queuedZxid = packetZxid; } // ...... //返回待发送队列中,最大的事务Zxid。 return queuedZxid; }
//follower节点在leader选举完成后的执行方法 void followLeader() throws InterruptedException { // ...... try { // ...... try { // ....... //---------------------------------------------------FOLLOWERINFO----------------------------------- //1。向leader发送FOLLOWERINFO消息,更新AcceptEpoch //并获取newEpochZxid,详细见Learner#registerWithLeader() long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); // ...... //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check //如果newEpochZxid < 该节点的acceptEpoch long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } // ...... try { //开始真正同步数据Learner#syncWithLeader() syncWithLeader(newEpochZxid); // ...... } // ....... }Learner.java
protected long registerWithLeader(int pktType) throws IOException { //获取lastLoggedZxid = LastProcessedZxid,pktType = FOLLOWERINFO,Zxid = AcceptedEpoch。 long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); // ...... //1。首先向Leader发送FOLLOWERINFO消息 writePacket(qp, true); //3。从leader接受响应 readPacket(qp); //3。获取新leader的newEpoch final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); //-----------------------------------------------------接受LEADERINFO------------------------------------ //如果收到的消息是LEADERINFO类型 if (qp.getType() == Leader.LEADERINFO) { // ...... //3。newEpoch>该节点的AcceptEpoch则更新该节点的AcceptEpoch if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int) self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } //----------------------------------------------------发送ACKEPOCH-------------------------------------- // ...... //3。回复ACKEPOCH消息 QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); //3。返回newEpochZxid return ZxidUtils.makeZxid(newEpoch, 0); } // ...... }
protected void syncWithLeader(long newLeaderZxid) throws Exception { // ...... // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history //该变量表明是否需要执行快照 *** 作,该变量和数据同步方式SNAP/DIFF/TRUNC无关,只是表明在同步过程中是否需要执行快照; //当然对于SNAP方式,肯定需要执行快照。 boolean snapshotNeeded = true; //表明进行快照 *** 作时的的方式是同步/异步,false表明是异步。 boolean syncSnapshot = false; //获取一个消息。 readPacket(qp); //待提交队列,待提交的事务保存到该队列中,准备提交。 Deque4.总结 1.流程和注意点packetsCommitted = new ArrayDeque<>(); //该队列保存数据同步阶段,接受leader发送过来的事务。 Deque packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { //如果是DIFF同步方式 if (qp.getType() == Leader.DIFF) { // ...... //如果强制收到leader的同步数据应用到自身Datatree后,同步快照;则在数据同步完成后,需要保存快照。 //保存快照的时机在收到NEWLEADER消息后 if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { snapshotNeeded = true; syncSnapshot = true; } //如果不强制数据同步后,同步快照,则不需要再数据同步后保存快照。 //boolean writeToTxnLog = !snapshotNeeded;但是需要再ACK之前,将数据同步阶段收到的事务写入磁盘事务log else { snapshotNeeded = false; } } //如果使用SNAP方式,该代码执行后,snapshotNeeded==true;即SNAP数据同步后,需要保存快照。 //保存快照的时机在收到NEWLEADER消息后 else if (qp.getType() == Leader.SNAP) { // ...... // The leader is going to dump the database // db is clear as part of deserializeSnapshot() //反序列化快照数据 zk.getZKDatabase().deserializeSnapshot(leaderIs); // ...... //设置该节点的lastProcessedZxid zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); // immediately persist the latest snapshot when there is txn log gap //使用同步方式进行快照。 syncSnapshot = true; } //如果是TRUNC方式,该代码执行后,snapshotNeeded==true;即TRUNC数据同步后,需要保存快照。 //保存快照的时机在收到NEWLEADER消息后 else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader //该方法:1.关闭打开的事务log文件快照文件 // 2.删除磁盘事务log中在TRUNC消息中附带的zxid之后的事务 // 3.从快照和事务log中重新加载数据到内存 boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); // ...... //设置该节点的lastProcessedZxid zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } // ...... long lastQueued = 0; // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. boolean isPreZAB1_0 = true; //If we are not going to take the snapshot be sure the transactions are not applied in memory // but written out to the transaction log //如果在数据同步后,不执行快照 *** 作,则需要在ACK之前在磁盘写入事务log boolean writeToTxnLog = !snapshotNeeded; TxnLogEntry logEntry; // we are now going to start getting transactions to apply followed by an UPTODATE //收到UPTODAE消息后,将退出outerLoop循环,该follower/observer此时可以处理客户端请求 outerLoop: while (self.isRunning()) { //接收leader发来的消息 readPacket(qp); switch (qp.getType()) { //如果是PROPOSAL消息,该消息包含事务数据 case Leader.PROPOSAL: //该对象保存反序列化后的消息 PacketInFlight pif = new PacketInFlight(); //反序列化出消息中的事务数据 logEntry = SerializeUtils.deserializeTxn(qp.getData()); // ....... //packetsNotCommitted队列中加入该消息 packetsNotCommitted.add(pif); break; //如果是commit消息 case Leader.COMMIT: //从packetsNotCommitted队列中获取第一个消息 pif = packetsNotCommitted.peekFirst(); // ...... //如果不需要写入磁盘事务log;只要需要在数据同步后,保存快照,就不需要写入磁盘事务log。 if (!writeToTxnLog) { //直接应用到内存结构中,不需要写入磁盘log文件,对应于TRUNC方式,或者DIFF需要强制进行快照的情况 zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } //需要则将该消息中的事务Zxid保存到packetsCommitted队列中 else { packetsCommitted.add(qp.getZxid()); } break; // ...... //如果收到的是UPTODATE消息 case Leader.UPTODATE: // ...... // 退出外层循环 break outerLoop; //如果收到的是NEWLEADER消息 case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery // means this is Zab 1.0 // ...... //如果需要保存快照,此时将当前的内存Datatree结构保存到快照文件;真正执行快照保存的地方。 if (snapshotNeeded) { //对于TRUNC/SNAP。将当前的内存Datatree结构保存到快照文件,syncSnapshot用来控制写快照文件是同步/异步,对于SNAP是同步方式,对于TRUNC是异步方式。 //对于DIFF的强制保存快照的情况,也会保存一份快照,使用同步模式。syncSnapshot = true zk.takeSnapshot(syncSnapshot); } //更新follower的epoch self.setCurrentEpoch(newEpoch); // ..... 对于DIFF不保存快照的情况,确保将leader发送过来的事务记录到磁盘log文件中。 if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { //写磁盘事务log文件 fzk.logRequest(p.hdr, p.rec, p.digest); } //写完后情况packetsNotCommitted队列 packetsNotCommitted.clear(); } //发送ACK消息,该ACK对应NEWLEADER消息 writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } //发送ACK给leader; //只有收到UPTODATE消息,才会退出上面的循环,因此:此时的ACK是回复UPTODATE消息此时的ACK是回复UPTODATE消息 ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); //该节点开始正常服务 //...... }
不论是follower/observer在leader选举完成后数据同步阶段的事情差不多,因此都用follower代替。
follower-------->FOLLOWERINFO--------->leader
follower附带上自己的acceptEpoch。
leader----------->LEADERINFO-------------->follower
leader根据收到的acceptEpoch和自身的,得到newEpochZxid,发送给follower
follower-------->ACKEPOCH----------------->leader
follower收到newEpochZxid后,更新自己的acceptEpoch,然后回复ACKEPOCH,附带上其lastProcessedZxid。
当leader收到quorum的节点的ACKEPOCH后
peerLastZxid = ACKEPOCH.lastProcessedZxid ,设置peerLastZxid为follower的lastProcessedZxid minCommittedLog,maxCommittedLog:见2.关键对象
根据情况使用DIFF/TRUNC/SNAP数据同步方式
TRUNC消息会附带一个ZXid变量,该变量通知follower将其磁盘中的事务log,在Zxid之后的部分删除。 DIFF消息会附带一个Zxid变量,该变量通知follower数据同步到哪个Zxid。 具体可以分为3种情况: 1. peerLastZxid > maxCommittedLog:使用TRUNC:写入待发送队列-->[(TRUNC,maxCommittedLog)](待发送队列queuePacket) 2. peerLastZxid 在[minCommittedLog,maxCommittedLog]之间:使用committedLog中的事务数据同步follower。 如果leader有PeerLastZxid对应的事务:使用DIFF:写入待发送队列-->[(DIFF,maxCommittedLog)](待发送队列queuePacket) 如果leader没有PeerLastZxid对应的事务: 如果第一条大于PeerLastZxid的Zxid的epoch于PeerLastZxid的epoch不相同:使用SNAP //preProposalZxid见2.关键对象 上述不满足:使用TRUNC方式:写入待发送队列-->[(TRUNC,preProposalZxid)](待发送队列queuePacket) 3. peerLastZxid < minCommittedLog:使用磁盘事务log和内存committedLog中的事务数据同步follower。 内部执行同上面的情况2 4. 如果情况2和3使用不是SNAP: 将leader节点上(磁盘/内存中)Zxid在(PeerLastZxid,maxCommittedLog]中的每一个事务按序写入queuePacket //x为大于PeerLastZxid的最小Zxid,其实(T_Zxid)是一条PROPOSAL消息,真实应该写作(PROPOSAL,T_Zxid)此处简写 --->[(COMMIT),(T_maxCommittedLog),..,(COMMIT),(T_x+1),(COMMIT),(T_x), (TRUNC,peerLastZxid)/(DIFF,maxCommittedLog)](待发送队列queuePacket) 5. 如果情况2和3使用的是SNAP: 序列化leader的磁盘快照
注意:我们可以发现,除了第一种情况直接使用TRUNC外,其他情况下(2/3),三种方式都有可能使用;所以很多博客不知道是看的源码版本问题还是瞎写,对此处的理解有大问题。
比如: 很多书籍博客认为:情况3只会使用SNAP方式,显然错误 对于情况2:很多书籍认为不会使用SNAP方式,举个例子: //这里对于zxid,直接使用epoch_counter方式简化,具体zk的实现方式见2.关键对象 假设现在leader的committedLog种包含的事务的zxid为:1_1,1_2,2_1,2_2 follower的peerLastZxid = 1_3 这种情况书籍《从Paxos到ZooKeeper-分布式一致性原理与实践》认为使用TRUNC方式 由于peerLastZxid的epoch==1,leader种第一个大于peerLastZxid的zxid的epoch为2, 这两不相同(见LeaderHandler#queueCommittedProposals()),很明显使用SNAP方式。
在确定了同步方式后
leader-------------NEWLEADER-------------->follower
此时将NEWLEADER消息写入queuePacket //对于1情况下的队列内容 --->[(NEWLEADER),(TRUNC,maxCommittedLog)](待发送队列queuePacket) //对于2,3情况的TRUNC/DIFF方式,此时的队列内容 --->[(NEWLEADER),(COMMIT),(T_maxCommittedLog),..,(COMMIT),(T_x+1),(COMMIT),(T_x), (TRUNC,peerLastZxid)/(DIFF,maxCommittedLog)](待发送队列queuePacket) //对于SNAP,此时的队列内容 --->[(NEWLEADER),(SNAP)](待发送队列queuePacket) 然后leader将queuePacket中得消息依次发送给follower;对于SNAP,则还会发送快照。
follower--------------ACK----------------------->leader
follwer收到DIFF/TRUNC/SNAP消息后,分3种情况进行处理: 1. 收到了DIFF消息:如果DIFF消息后有事务数据(PROPOSAL消息),则将这些事务数据保存到一个内存队列中packetsNotCommitted; 收到COMMIT消息后,如果不需要保存快照,则将事务数据保存到packetsCommitted队列,需要保存快照则直接把该事务应用到内存datatree。 2. 收到了TRUNC消息:如果TRUNC消息后,首先会 关闭打开的事务log文件快照文件 删除磁盘事务log中在TRUNC消息中附带的zxid之后的事务 从快照和事务log中重新加载数据到内存datatree 之后如果有事务数据(PROPOSAL消息),则将这些事务数据保存到一个内存队列中packetsNotCommitted;收到COMMIT消息后,直接把该事务应 用到内存datatree(因为对于TRUNC方式和SNAP方式,会要求follower在事务数据应用到内存datatree后,保存内存快照到磁盘); 所以此处直接对应DIFF需要保存快照的情况。 3. 收到SNAP消息: 反序列化leader发来的快照数据,并应用于follower的内存datatree。 收到NEWLEADER消息后,上述步骤已经执行完,因为NEWLEADER消息是leader的queuePacket队列的最后一条消息: 如果需要在事务应用到内存Datatree后保存快照: 对于DIFF(配置中强制要求快照)/SNAP,使用同步方式保存快照,即只有快照保存完成后当前线程才会继续代码 对于TRUNC,使用异步方式保存快照,即开启一个线程保存快照,当前线程可以直接继续执行代码。 如果不需要在事务应用到内存Datatree后保存快照(只有DIFF(配置中不强制要求保存快照)方式满足): follower会将packetsNotCommitted中的事务数据写入磁盘事务log。 然后更新follower此时的currentEpoch = newEpoch NEWLEADER处理完成后,此时才发送ACK给leader
leader-------------->UPTODATE--------------->follower
leader收到quorum的节点发出的ACK消息: 修改leader的currentEpoch = newEpoch 然后发送UPTODATE给follower,通知follower可以正常处理客户端请求
follower------------->ACK------------------------>leader
follower收到UPTODATE消息后,将packetsCommitted中的事务应用到内存dataTree(对应于DIFF不需要保存快照的情况,在需要保存快照的情况和TRUN/SNAP同步方式下,follower早已将事务应用到内存datatree,此时packetsCommitted中无事务数据) 返回ACK给leader,此ACK leader不需要等所有quorum的节点都发送。 follower可以正常处理客户端请求。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)