前文分析了Leader选举过程,完成选举之后,就会把对应节点设置成对应的状态,我们知道集群中有三种角色Leader、Follower、Observer,分别对应着源码中的LEADING、FOLLOWING、OBSERVING。还是回到QuorumPeer的run方法:
public void run() { try { while (running) { switch (getPeerState()) { //开始leader选举过程 case LOOKING: setCurrentVote(makeLEStrategy().lookForLeader()); break; case OBSERVING: setObserver(makeObserver(logFactory)); observer.observeLeader(); break; case FOLLOWING: try { setFollower(makeFollower(logFactory)); follower.followLeader(); break; case LEADING: setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); break; } } } }
接下来就围绕着OBSERVING、FOLLOWING、LEADING这三种模式进行请求的分析过程。
二、OBSERVINGOBSERVING对应着集群中Observer节点,这个节点状态不参与Leader选举,也不参与投票,写请求会转发给Leader节点,读请求会直接读取相应节点信息,同步集群中最新的数据到Observer节点。
//设置当前节点为Observer节点 setObserver(makeObserver(logFactory)); observer.observeLeader();
makeObserver方法会返回一个Observer对象,构造参数中会传入QuorumPeer和ObserverZooKeeperServer实例。所以我们跟着observeLeader()方法进行一步步分析:
void observeLeader() throws Exception { long connectTime = 0; boolean completedSync = false; try { //设置当前ZAB状态为DISCOVERY self.setZabState(QuorumPeer.ZabState.DISCOVERY); //找到Leader节点 QuorumServer master = findLearnerMaster(); try { //连接Leader节点 connectToLeader(master.addr, master.hostname); connectTime = System.currentTimeMillis(); //获取Leader节点的epoch值,也就是最新的事务id long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); //设置Leader节点的地址和主机id值 self.setLeaderAddressAndId(master.addr, master.getId()); //设置ZAB状态为同步状态 self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); //同步最新数据 syncWithLeader(newLeaderZxid); //设置当前ZAB状态为广播 self.setZabState(QuorumPeer.ZabState.BROADCAST); completedSync = true; QuorumPacket qp = new QuorumPacket(); //此时会不断的读取leader节点上传来的数据包,并处理 while (this.isRunning() && nextLearnerMaster.get() == null) { readPacket(qp); processPacket(qp); } } catch (Exception e) { } } finally { } }
此时我们需要集中分析registerWithLeader、syncWithLeader、processPacket这三个方法。
registerWithLeader:
protected long registerWithLeader(int pktType) throws IOException { //得到本节点的最新事务id long lastLoggedZxid = self.getLastLoggedZxid(); //封装一个请求数据包,请求类型为OBSERVERINFO,请求内容是当前节点的Epoch值 QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); //封装节点的id值、协议号和版本号 LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); //发送给Leader节点 writePacket(qp, true); //读取leader节点的值 readPacket(qp); //得到Leader节点上的epoch值 final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { //获取leader 的协议版本号 leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte[] epochBytes = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); //如果leader节点上的epoch大于本节点,把本节点的epoch设置为leader节点上的epoch值 if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int) self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { //如果相等不做 *** 作 wrappedEpochBytes.putInt(-1); } else { //否则抛出异常 throw new IOException("Leaders epoch, "+ newEpoch + " is less than accepted epoch, "+ self.getAcceptedEpoch()); } //给Leader节点发送一个响应 QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0); } else { if (newEpoch > self.getAcceptedEpoch()) { self.setAcceptedEpoch(newEpoch); } if (qp.getType() != Leader.NEWLEADER) { LOG.error("First packet should have been NEWLEADER"); throw new IOException("First packet should have been NEWLEADER"); } return qp.getZxid(); } }
registerWithLeader方法就是比较当前Observer节点和Leader节点上的最新事务id是否一致,并更新当前Observer节点的事务id值,如果当前的事务id小于或等于Leader端的事务id写入响应给Leader节点。
syncWithLeader:
这个方法就是同步Leader节点的数据,使当前节点保持与Leader节点一致(省略了部分代码)。
protected void syncWithLeader(long newLeaderZxid) throws Exception { //响应包 QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); QuorumVerifier newLeaderQV = null; boolean snapshotNeeded = true; boolean syncSnapshot = false; //在registerWithLeader方法中,给Leader节点写入了响应包这里继续读取Leader节点的响应包 readPacket(qp); DequepacketsCommitted = new ArrayDeque<>(); Deque packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { //Leader端响应的是DIFF if (qp.getType() == Leader.DIFF) { //设置为DIFF,并判断是否需要强制写入初始化快照文件 self.setSyncMode(QuorumPeer.SyncMode.DIFF); if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { snapshotNeeded = true; syncSnapshot = true; } else { snapshotNeeded = false; } //如果响应类型是SNAP } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); //此时会清空当前内存数据,然后同步Leader上的数据到本地内存 zk.getZKDatabase().deserializeSnapshot(leaderIs); //设置最新的zxid zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { //如果是TRUNC self.setSyncMode(QuorumPeer.SyncMode.TRUNC); //通过日志文件,同步与Leader一直的数据 boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); //设置最新zxid zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); //创建session追踪器 zk.createSessionTracker(); long lastQueued = 0; boolean isPreZAB1_0 = true; boolean writeToTxnLog = !snapshotNeeded; TxnLogEntry logEntry; outerLoop: while (self.isRunning()) { //如果Leader端响应的是DIFF数据同步,那么会继续读取Leader端的数据 readPacket(qp); switch (qp.getType()) { //Leader会通过提案的方式发送给Learner节点 case Leader.PROPOSAL: PacketInFlight pif = new PacketInFlight(); logEntry = SerializeUtils.deserializeTxn(qp.getData()); packetsNotCommitted.add(pif); break; //Leader在不断的发送提交 *** 作 case Leader.COMMIT: case Leader.COMMITANDACTIVATE: //取出最新的提案,然后执行提交 *** 作,也就是调用processTxn方法 pif = packetsNotCommitted.peekFirst(); if (!writeToTxnLog) { if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn( "Committing 0x{}, but next proposal is 0x{}", Long.toHexString(qp.getZxid()), Long.toHexString(pif.hdr.getZxid())); } else { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } } else { packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); } if (!writeToTxnLog) { zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; //已经完成与服务端的数据更新 *** 作,退出循环 case Leader.UPTODATE: if (isPreZAB1_0) { //建立快照信息 zk.takeSnapshot(syncSnapshot); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: if (snapshotNeeded) { zk.takeSnapshot(syncSnapshot); } self.setCurrentEpoch(newEpoch); writeToTxnLog = true; isPreZAB1_0 = false; sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec, p.digest); } packetsNotCommitted.clear(); } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } //响应Leader ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); //设置zk为启动状态 zk.startServing(); }
syncWithLeader方法就是同步Leader服务的最新数据,数据同步的方式有DIFF、SNAP、TRUNC
- DIFF:Leader响应的DIFF模式,Leader就会以Proposal的方式发送给Observer节点,然后再发送commit指令,Observer节点接收到commit指令,进行数据持久化 *** 作。
- SNAP:这种方式就是Observer会清空本地数据,然后直接接收Leader的的所有数据。
- TRUNC:也会清空本地数据,然后通过最新的zxid找到本地日志,再从日志中恢复数据。
这就是服务启动后Observer节点和Leader节点进行数据同步的算法。
完成数据同步之后,就会通过while循环不断的接收和处理Leader端的数据:
while (this.isRunning() && nextLearnerMaster.get() == null) { readPacket(qp); processPacket(qp); }
主要看看processPacket方法:
protected void processPacket(QuorumPacket qp) throws Exception { TxnLogEntry logEntry; TxnHeader hdr; TxnDigest digest; Record txn; switch (qp.getType()) { //Leader会与Observer节点保持心跳检测 case Leader.PING: ping(qp); break; //如果是提案,commit以及uptodate则忽略 *** 作 case Leader.PROPOSAL: LOG.warn("Ignoring proposal"); break; case Leader.COMMIT: LOG.warn("Ignoring commit"); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Observer started"); break; case Leader.RevalIDATE: //验证 *** 作 revalidate(qp); break; case Leader.SYNC: //处理同步请求 ((ObserverZooKeeperServer) zk).sync(); break; case Leader.INFORM: //如果当前是INFORM则封装成Request请求提交给RequestProcessor,这也是Observer接收Leader数据同步的处理位置 ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1); logEntry = SerializeUtils.deserializeTxn(qp.getData()); hdr = logEntry.getHeader(); txn = logEntry.getTxn(); digest = logEntry.getDigest(); Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY); request.setTxnDigest(digest); ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk; obs.commitRequest(request); break; case Leader.INFORMANDACTIVATE: //处理配置信息更新 *** 作 ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); byte[] remainingdata = new byte[buffer.remaining()]; buffer.get(remainingdata); logEntry = SerializeUtils.deserializeTxn(remainingdata); hdr = logEntry.getHeader(); txn = logEntry.getTxn(); digest = logEntry.getDigest(); QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8)); request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); request.setTxnDigest(digest); obs = (ObserverZooKeeperServer) zk; boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); obs.commitRequest(request); if (majorChange) { throw new Exception("changes proposed in reconfig"); } break; default: LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp)); break; } }
集群启动后,作为Observer节点,先获取到Leader对象,然后把自己注册到Leader节点,并更新当前的最新事务id,然后同步数据保持与Leader节点数据一致性,接下来就是循环接收Leader传递过来的写请求,并持久化到本地。此时的服务容器是ObserverZooKeeperServer对象,我们知道单机下是ZooKeeperServer。
三、接收客户端请求集群启动完成,并完成了数据同步后,Observer就开始正式工作,处理接收服务端的数据同步外,还可以接收客户端的请求,那么整个处理过程又是怎样呢?
客户端的请求会封装成一个Request对象,然后提交给RequestProcessor实例处理
firstProcessor.processRequest(si);
此时firstProcessor是ObserverRequestProcessor他的下一个是CommitProcessor然后再是FinalRequestProcessor,所以处理顺序也是这样。主要看看ObserverRequestProcessor和CommitProcessor的processorRequest方法。
ObserverRequestProcessor中会添加到queuedRequests阻塞队列中,其run方法会先调用 nextProcessor的processRequest方法,如果是事务性请求,再转发给Leader节点,接下来需要分析CommitProcessor中的处理逻辑。
processRequest:
public void processRequest(Request request) { if (stopped) { return; } request.commitProcQueueStartTime = Time.currentElapsedTime(); //添加到队列中 queuedRequests.add(request); // 如果是事务性 *** 作返回true,否则返回false if (needCommit(request)) { //往queuedWriteRequests队列中添加当前请求 queuedWriteRequests.add(request); numWriteQueuedRequests.incrementAndGet(); } else { numReadQueuedRequests.incrementAndGet(); } wakeup(); }
在run方法中,针对读请求,则从queuedRequests队列中取出request请求,然后提交给FinalRequestProcessor进行处理,然后返回给客户端,这个在单机启动下已经介绍过。如果当前是写请求,会把当前的写请求添加到pendingRequests集合中,此时Leader端处理好Observer转发的写请求,通过投票并通过了当前写请求,那么就会像Observer节点相应一个INFORM,收到这个消息之后就会执行提交 *** 作,也就是往committedRequests中写入当前的request。此时run方法中就会针对pendingRequests和committedRequests中的数据进行对比,提交当前的写请求给FinalRequestProcessor然后响应客户端。
四、总结Observer节点在集群启动完成后,把自己注册到Leader节点上,并同步Leader节点的数据保持数据一致性,当接收到客户端请求为事务性请求,则转发给Leader节点,Leader节点通过选票通过之后,在发送一个INFORM消息给Observer节点,收到该消息后,Observer执行提交 *** 作。如果是读请求,直接读取数据响应客户端。
以上,有任何不对的地方,请留言指正,敬请谅解。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)