public class FollowerZooKeeperServer extends LearnerZooKeeperServer { protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); } }
所以,其处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
2.FollowerRequestProcessorpublic class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { linkedBlockingQueue2.1 follower转发事务请求到leaderqueuedRequests = new linkedBlockingQueue (); // 从客户端获取的请求全部存入queuedRequests,后续通过run()方法调用执行 public void processRequest(Request request) { if (!finished) { queuedRequests.add(request); } } public void run() { try { while (!finished) { Request request = queuedRequests.take(); ... // 请求交由下一个processor(commitProcessor)处理 nextProcessor.processRequest(request); switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setdata: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: // 有关于事务类型请求,直接交由leader处理,具体见2.1 zks.getFollower().request(request); break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); } }
public class Learner { void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte b[] = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos .toByteArray(), request.authInfo); writePacket(qp, true); } }
2.2 Follower.followLeader() follower启动public class Follower extends Learner{ void followLeader() throws InterruptedException { ... try { QuorumServer leaderServer = findLeader(); try { // 创建与leader连接 connectToLeader(leaderServer.addr, leaderServer.hostname); // 将当前节点信息注册到leader上 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); // 与leader进行数据同步 syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { // 接收leader请求包,并进行处理 readPacket(qp); processPacket(qp); } } ... } protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { // 接收到leader发送过来的proposal case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); // 进行事务日志处理,具体见2.2.1 fzk.logRequest(hdr, txn); break; case Leader.COMMIT: // 当leader收集到足够的ack后,向各follower发送commit,具体见2.2.2 fzk.commit(qp.getZxid()); break; ... } } }
2.2.1 FollowerZooKeeperServer.logRequest() 创建事务请求日志
public class FollowerZooKeeperServer extends LearnerZooKeeperServer { public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.hdr = hdr; request.txn = txn; request.zxid = hdr.getZxid(); if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } // 直接交由syncProcessor处理 syncProcessor.processRequest(request); } }
2.2.2 SendAckRequestProcessor 返回leader ack响应
public class SendAckRequestProcessor implements RequestProcessor, Flushable { public void processRequest(Request si) { if(si.type != OpCode.sync){ // 直接返回leader ack响应包 QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null, null); try { learner.writePacket(qp, false); } catch (IOException e) { ... } } } }
2.2.3 FollowerZooKeeperServer.commit() 提交事务proposal
public class FollowerZooKeeperServer extends LearnerZooKeeperServer { public void commit(long zxid) { if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } long firstElementZxid = pendingTxns.element().zxid; if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } Request request = pendingTxns.remove(); // 最终交由commitProcessor处理,详见3.1 commitProcessor.commit(request); } }
接收leader proposal请求,记录事务日志后,返回ack响应;
接收leader commit请求,将请求交由CommitProcessor处理;
3.CommitProcessor 3.1 CommitProcessor.commit() 提交leader事务proposalpublic class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor { synchronized public void commit(Request request) { if (!finished) { if (request == null) { LOG.warn("Committed a null!", new Exception("committing a null! ")); return; } if (LOG.isDebugEnabled()) { LOG.debug("Committing request:: " + request); } // 很简单,直接将请求放入committedRequests committedRequests.add(request); notifyAll(); } } }
3.2 CommitProcessor.run() 处理请求public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor { // leader获取的请求集合 linkedListqueuedRequests = new linkedList (); // 已经被follower 提交的请求集合 linkedList committedRequests = new linkedList (); public void run() { try { Request nextPending = null; while (!finished) { int len = toProcess.size(); for (int i = 0; i < len; i++) { // 5.请求proposal已完成,交由下个processor处理即可 nextProcessor.processRequest(toProcess.get(i)); } toProcess.clear(); synchronized (this) { // 2.若没有收到请求且没有收到leader的commit请求,则等待 if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() == 0) { wait(); continue; } // 3.committedRequests不为空,说明当前follower已经接受到leader的commit请求 if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() > 0) { Request r = committedRequests.remove(); if (nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid) { nextPending.hdr = r.hdr; nextPending.txn = r.txn; nextPending.zxid = r.zxid; // 4.本次请求可以提交给下个processor处理 toProcess.add(nextPending); nextPending = null; } else { // this request came from someone else so just // send the commit packet toProcess.add(r); } } } // We haven't matched the pending requests, so go back to // waiting if (nextPending != null) { continue; } // 1.请求达到时,nextPending被设置为当前request,下次循环时会使用到 synchronized (this) { // Process the next requests in the queuedRequests while (nextPending == null && queuedRequests.size() > 0) { Request request = queuedRequests.remove(); switch (request.type) { case OpCode.create: case OpCode.delete: case OpCode.setdata: case OpCode.multi: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: nextPending = request; break; case OpCode.sync: if (matchSyncs) { nextPending = request; } else { toProcess.add(request); } break; default: toProcess.add(request); } } } } } catch (InterruptedException e) { LOG.warn("Interrupted exception while waiting", e); } catch (Throwable e) { LOG.error("Unexpected exception causing CommitProcessor to exit", e); } LOG.info("CommitProcessor exited loop!"); } }