Follower节点在集群中会参与投票以及Leader选举,同样会转发事务请求给Leader节点,然后通过Leader发起的投票进行表决,通过后提交当前事务。如果是写请求直接执行 *** 作。与Observer节点不一样的地方在于Follower会参与投票和Leader选举。
QuorumPeer的run方法中会根据当前的状态是FOLLOWING执行对应的 *** 作
setFollower(makeFollower(logFactory)); follower.followLeader();
followLeader()的处理流程和Observer中的处理流程一致,先找到Leader节点地址,把自己注册到Leader节点上,然后同步Leader节点上的数据保持与Leader数据一致性。接下来也是循环的读取和处理Leader的消息。主要的不同在于processPacket方法。
protected void processPacket(QuorumPacket qp) throws Exception { switch (qp.getType()) { case Leader.PING: //维持心跳 ping(qp); break; //如果Leader发送的消息PROPOSAL(提案) case Leader.PROPOSAL: ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1); TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData()); TxnHeader hdr = logEntry.getHeader(); Record txn = logEntry.getTxn(); TxnDigest digest = logEntry.getDigest(); if (hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", Long.toHexString(hdr.getZxid()), Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); if (hdr.getType() == OpCode.reconfig) { SetDataTxn setDataTxn = (SetDataTxn) txn; QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); self.setLastSeenQuorumVerifier(qv, true); } //针对当前PROPOSAL进行日志记录,但不提交到数据库中,并把当前的PROPOSAL添加到pendingTxns队列中, fzk.logRequest(hdr, txn, digest); break; case Leader.COMMIT: //如果是提交 *** 作 //执行提交 *** 作,也就是调用FinalRequestProcessor进行数据持久化 fzk.commit(qp.getZxid()); break; case Leader.COMMITANDACTIVATE: //执行config信息的提交 *** 作 fzk.commit(zxid); break; case Leader.UPTODATE: break; case Leader.RevalIDATE: if (om == null || !om.revalidateLearnerSession(qp)) { revalidate(qp); } break; case Leader.SYNC: //执行同步 *** 作 fzk.sync(); break; default: break; } }
processPacket处理逻辑是接收Leader的Proposal,进行日志记录,等待Leader的commit命令吗,执行提交 *** 作。
二、客户端请求经过之前的分析知道,客户端请求最终会提交给firstProcessor来处理,此时我们看看FollowerZooKeeperServer中是如何初始化Processor的。
protected void setupRequestProcessors() { //Final是执行持久化 *** 作 RequestProcessor finalProcessor = new FinalRequestProcessor(this); //CommitProcessor把当前的事务性 *** 作提交给Final对象 commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); //FollowerRequestProcessor是处理入口 firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); //SyncRequestProcessor是针对Proposal进行日志记录,并通过SendAckRequestProcessor响应一个ACK给Leader syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); syncProcessor.start(); }
此时FollowerRequestProcessor中的processRequest方法也是把请求添加到queuedRequests队列中,其run方法也是把当的请求转发给下一个Processor,并且针对事务性 *** 作转发给Leader节点,此时CommitProcessor *** 作和Observer中是一样的逻辑,就不进行分析。
四、总结Follower的处理流程和Observer的区别,从源码角度来说,他们都会把事务性请求转发给Leader节点,但是Observer节点不会接收到Leader节点发送的Proposal,也不参与Proposal的表决,只会接收Leader端的INFORM信息然后提交数据,Follower会接收来自Leader的Proposal,然后进行响应,服务端通过表决,发送commit命令给Follower,Follower收到后执行提交 *** 作,然后响应给Leader。
以上,有任何不对的地方,请留言指正,敬请谅解。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)