当然,有关于Leader的processor还是那些:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor。我们直接来分析下其处理过程。
1.PrepRequestProcessorpublic class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { protected void pRequest(Request request) throws RequestProcessorException { request.hdr = null; request.txn = null; try { switch (request.type) { case OpCode.create: CreateRequest createRequest = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); break; ... } } // 最终交由下一个processor处理 request.zxid = zks.getZxid(); nextProcessor.processRequest(request); } // 具体处理在这里 protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { case OpCode.create: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if(deserialize) // 将客户端的请求体反序列化到CreateRequest对象中 ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); // path检查 String path = createRequest.getPath(); int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('') != -1 || failCreate) { LOG.info("Invalid path " + path + " with session 0x" + Long.toHexString(request.sessionId)); throw new KeeperException.BadArgumentsException(path); } // ACL权限检查 List listACL = removeDuplicates(createRequest.getAcl()); if (!fixupACL(request.authInfo, listACL)) { throw new KeeperException.InvalidACLException(path); } String parentPath = path.substring(0, lastSlash); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); // 根据创建节点类型,重置path信息 CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NonodeException e) { // ignore this one } // 检查父节点是否临时节点 boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; // 补充request的txn对象信息,后续requestProcessor会用到 request.txn = new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), newCversion); StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } // 修改父节点的stat信息 parentRecord = parentRecord.duplicate(request.hdr.getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL)); break; } ... }
在这里的处理与之前分析单机版的节点处理没有任何区别,主要就是对权限ACL、路径等的校验,后续交由ProposalRequestProcessor 处理
2.ProposalRequestProcessorpublic class ProposalRequestProcessor implements RequestProcessor { public void processRequest(Request request) throws RequestProcessorException { // 如果请求来自leaner if(request instanceof LearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } else { // 事务和非事务请求都会将该请求流转到下一个processor(CommitProcessor ), nextProcessor.processRequest(request); // 而针对事务请求的话(事务请求头不为空),则还需要进行事务投票等动作,在这里与之前非事务请求有所不同 if (request.hdr != null) { try { // 针对事务请求发起一次propose,具体在2.1 zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } // 将本次事务请求记录到事务日志中去,之前有过SyncProcessor的分析,这里不再赘述 syncProcessor.processRequest(request); } } } }2.1 Leader针对事务请求发起propose
public class Leader { public Proposal propose(Request request) throws XidRolloverException { // 可以关注下这个bug if ((request.zxid & 0xffffffffL) == 0xffffffffL) { String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); throw new XidRolloverException(msg); } byte[] data = SerializeUtils.serializeRequest(request); proposalStats.setLastProposalSize(data.length); // 封装一个PROPOSAL类型的packet QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized (this) { if (LOG.isDebugEnabled()) { LOG.debug("Proposing:: " + request); } lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p); // 最终将proposal包发送到followers中 sendPacket(pp); } return p; } // 发送proposal到所有的follower中去 void sendPacket(QuorumPacket qp) { synchronized (forwardingFollowers) { for (LearnerHandler f : forwardingFollowers) { // 最终交由每个LearnerHandler来处理 f.queuePacket(qp); } } } }
2.2 Leader发送proposal到followersLeader将请求包装为proposal,最终交由LearnerHandler来发送。发送就是正常的发送即可,我们来看下接收follower的响应(ack)的相关逻辑
public class LearnerHandler extends ZooKeeperThread { @Override public void run() { ... while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); ByteBuffer bb; long sessionId; int cxid; int type; // 接收到响应 switch (qp.getType()) { // ACK类型,说明follower已经完成该次请求事务日志的记录 case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { if (LOG.isDebugEnabled()) { LOG.debug("Received ACK from Observer " + this.sid); } } syncLimitCheck.updateAck(qp.getZxid()); // leader计算是否已经有足够的follower返回ack leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; ... } } } }2.3 leader收集follower关于本次proposal的投票
public class Leader { synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { ... Proposal p = outstandingProposals.get(zxid); if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; } // 当当前响应ack的follower的sid添加到Proposal的ackSet中 p.ackSet.add(sid); // 是否已经有足够的follower返回ack if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr); LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1)); } outstandingProposals.remove(zxid); // 本次proposal已经被多数follower通过,可以进行commit // 先添加到toBeApplied中 if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null request for proposal: {}", p); } // leader向所有的follower发送commit命令,以提交本次proposal commit(zxid); inform(p); // 将本次请求添加到CommitProcessor.committedRequests集合中 zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } } } } }
3.CommitProcessor既然主要的事情都让ProposalRequestProcessor 做了,那CommitProcessor还做什么呢?
leader到目前为止只是将事务请求记录到事务日志,但是并没有添加到当前ZKDatabase中,那什么时候添加呢?最终会交由FinalRequestProcessor来添加。那添加的时机是什么时候呢?这个由CommitProcessor来把握 ,其主要作用在此。
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor { // leader获取的请求集合 linkedListqueuedRequests = new linkedList (); // 已经被follower 提交的请求集合 linkedList committedRequests = new linkedList (); public void run() { try { Request nextPending = null; while (!finished) { int len = toProcess.size(); for (int i = 0; i < len; i++) { // 5.请求proposal已完成,交由下个processor处理即可 nextProcessor.processRequest(toProcess.get(i)); } toProcess.clear(); synchronized (this) { // 2.若没有收到足够的follower ack,则等待 if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() == 0) { wait(); continue; } // 3.committedRequests不为空,说明已经收到足够的follower ack,follower已经commit本次请求 if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() > 0) { Request r = committedRequests.remove(); if (nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid) { nextPending.hdr = r.hdr; nextPending.txn = r.txn; nextPending.zxid = r.zxid; // 4.则针对leader而言,本次请求可以提交给下个processor处理 toProcess.add(nextPending); nextPending = null; } else { // this request came from someone else so just // send the commit packet toProcess.add(r); } } } // We haven't matched the pending requests, so go back to // waiting if (nextPending != null) { continue; } // 1.请求达到时,nextPending被设置为当前request,下次循环时会使用到 synchronized (this) { // Process the next requests in the queuedRequests while (nextPending == null && queuedRequests.size() > 0) { Request request = queuedRequests.remove(); switch (request.type) { case OpCode.create: case OpCode.delete: case OpCode.setdata: case OpCode.multi: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: nextPending = request; break; case OpCode.sync: if (matchSyncs) { nextPending = request; } else { toProcess.add(request); } break; default: toProcess.add(request); } } } } } catch (InterruptedException e) { LOG.warn("Interrupted exception while waiting", e); } catch (Throwable e) { LOG.error("Unexpected exception causing CommitProcessor to exit", e); } LOG.info("CommitProcessor exited loop!"); } }
4.ToBeAppliedRequestProcessorstatic class ToBeAppliedRequestProcessor implements RequestProcessor { private RequestProcessor next; private ConcurrentlinkedQueuetoBeApplied; public void processRequest(Request request) throws RequestProcessorException { // request.addRQRec(">tobe"); next.processRequest(request); Proposal p = toBeApplied.peek(); if (p != null && p.request != null && p.request.zxid == request.zxid) { toBeApplied.remove(); } } }
5.FinalRequestProcessorpublic class FinalRequestProcessor implements RequestProcessor { public void processRequest(Request request) { ProcessTxnResult rc = null; synchronized (zks.outstandingChanges) { ... if (request.hdr != null) { TxnHeader hdr = request.hdr; Record txn = request.txn; // 真正的创建该节点,添加到ZKDatabase中 rc = zks.processTxn(hdr, txn); } // 以上都完成后,将本次事务请求放入committedProposal队列中 if (Request.isQuorum(request.type)) { zks.getZKDatabase().addCommittedProposal(request); } } switch (request.type) { // 针对create请求,返回CreateResponse响应即可 case OpCode.create: { lastOp = "CREA"; rsp = new CreateResponse(rc.path); err = Code.get(rc.err); break; } } } }
还是借用<<从Paxos到Zookeeper 分布式一致性原理与实践>> 的一张图来总结下整个过程: