2021SC@SDUSC
- 回顾
- 源码分析
上一篇分析到了
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
继续分析
源码分析while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (qp.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); } tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit; ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { if (LOG.isDebugEnabled()) { LOG.debug("Received ACK from Observer " + this.sid); } } syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break;
Leader获得ack后包装成proposal,交由LearnerHandler来发送。
Proposal p = outstandingProposals.get(zxid); if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; } p.ackSet.add(sid); if (LOG.isDebugEnabled()) { LOG.debug("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size()); } if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr); LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1)); } outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null request for proposal: {}", p); } commit(zxid); inform(p); zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } } } }
leader收集follower关于本次proposal的投票
if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk; for(PacketInFlight p: packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec); } for(Long zxid: packetsCommitted) { fzk.commit(zxid); } } else if (zk instanceof ObserverZooKeeperServer) { // Similar to follower, we need to log requests between the snapshot // and UPTODATE ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { Long zxid = packetsCommitted.peekFirst(); if (p.hdr.getZxid() != zxid) { // log warning message if there is no matching commit // old leader send outstanding proposal to observer LOG.warn("Committing " + Long.toHexString(zxid) + ", but next proposal is " + Long.toHexString(p.hdr.getZxid())); continue; } packetsCommitted.remove(); Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); request.txn = p.rec; request.hdr = p.hdr; ozk.commitRequest(request); } } else { // New server type need to handle in-flight packets throw new UnsupportedOperationException("Unknown server type"); } }
判断当前是follower 还是 observer ,处理数据
syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (self.isRunning()) { readPacket(qp); processPacket(qp); }
读取leader发来的数据,处理后保存
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException { return new Leader(this, new LeaderZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); }
初始化一个leader对象,表示leader 节点的请求处理服务。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)