SyncRequestProcessor
AckRequestProcessor
// 如果请求是LearnerSyncRequest实例对象 if (request instanceof LearnerSyncRequest) { // 主节点执行processSync zks.getLeader().processSync((LearnerSyncRequest) request); // 没有等待完成的提议 if (outstandingProposals.isEmpty()) { sendSync(r); QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null); r.fh.queuePacket(qp); } else { // 尚有等待完成的提议 // 最后的提议,阻塞等待的同步请求集合 pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r); } } else { if (shouldForwardTonextProcessor(request)) { // 对于直接用户,直接交由下一阶段处理 nextProcessor.processRequest(request); } // 请求包含事务头--修改类请求 if (request.getHdr() != null) { try { // 通过主节点进行提议 zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } // 提议后,进行请求同步 syncProcessor.processRequest(request); } }提议
if (request.isThrottled()) { LOG.error("Throttled request send as proposal: {}. Exiting.", request); ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } if ((request.zxid & 0xffffffffL) == 0xffffffffL) { String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); throw new XidRolloverException(msg); } // 请求对象序列化 byte[] data = SerializeUtils.serializeRequest(request); // 提议状态信息 proposalStats.setLastBufferSize(data.length); // 数据包类别:Leader.PROPOSAL // 数据包zxid: // 数据包数据: QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); // 提议对象 Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized (this) { // 提议对象还包含自身集群信息 p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig) { self.setLastSeenQuorumVerifier(request.qv, true); } if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) { p.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } LOG.debug("Proposing:: {}", request); // 更新最后提议对象 lastProposed = p.packet.getZxid(); // 等待完成提议中新增lastProposed--提议对象 outstandingProposals.put(lastProposed, p); // 向所有从节点/观察者发送提议 sendPacket(pp); } ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1); return p; }SyncRequestProcessor
resetSnapshotStats(); // 随机值 randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); // 不超过snapSizeInBytes/2的随机值 randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); // 记录时间 lastFlushTime = Time.currentElapsedTime(); while (true) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); // 超时时间 long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); // 取出请求 Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); if (si == null) { flush(); si = queuedRequests.take(); } if (si == REQUEST_OF_DEATH) { break; } // 当前时间 long startProcessTime = Time.currentElapsedTime(); ServerMetrics. getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); // 数据实体执行append if (!si.isThrottled() && zks.getZKDatabase().append(si)) { // 如果事务日志累计到此刻,适合对数据实体执行快照 if (shouldSnapshot()) { // 重置快照状态 resetSnapshotStats(); // 将当前事务日志文件进行缓冲刷新到磁盘 *** 作 zks.getZKDatabase().rollLog(); if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { new ZooKeeperThread("Snapshot Thread") { public void run() { try { // 执行快照生产,过程受到snapThreadMutex保护 zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { snapThreadMutex.release(); } } }.start(); } } } // 没有事务请求等待刷新到磁盘,且本请求遭遇过滤 else if (toFlush.isEmpty()) { if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } // 这个请求属于尚未刷新到磁盘的事务请求 toFlush.add(si); // 如果此时应该刷新,刷新事务日志文件缓冲到磁盘 if (shouldFlush()) { flush(); } ServerMetrics.getMetrics(). SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); }append
txnCount.incrementAndGet(); return this.snapLog.append(si); txnLog.append(si.getHdr(), si.getTxn(), si.getTxnDigest()); // 只有写请求有事务日志 if (hdr == null) { return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn( "Current zxid {} is <= {} for {}", hdr.getZxid(), lastZxidSeen, Request.op2String(hdr.getType())); } else { lastZxidSeen = hdr.getZxid(); } if (logStream == null) { LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid())); // 新的事务日志文件 logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream = new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); // 新文件文件头 FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); fhdr.serialize(oa, "fileheader"); logStream.flush(); // 设置好文件大小 filePadding.setCurrentSize(fos.getChannel().position()); // 当前事务日志文件,写入内容不保证立即刷新到磁盘 streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); // 序列化 byte[] buf = Util.marshallTxnEntry(hdr, txn, digest); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } // 写入crc Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); // 写入事务本身 Util.writeTxnBytes(oa, buf); return true;commit
this.snapLog.commit(); txnLog.commit(); // 如果当前事务日志文件存在,刷新 文件缓冲到磁盘 if (logStream != null) { logStream.flush(); } // 对需要刷新的每个对象 for (FileOutputStream log : streamsToFlush) { // 刷新缓冲到磁盘 log.flush(); // 取决于环境变量 if (forceSync) { // 当前时间 long startSyncNS = System.nanoTime(); FileChannel channel = log.getChannel(); // 强制刷新文件内容 channel.force(false); syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { if (serverStats != null) { serverStats.incrementFsyncThresholdExceedCount(); } LOG.warn("fsync-ing the write ahead log in {} took {}ms which will adversely effect operation latency." + "File size is {} bytes. See the ZooKeeper troubleshooting guide", Thread.currentThread().getName(), syncElapsedMS, channel.size()); } ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS); } } // 刷新后事务日志文件进行关闭 while (streamsToFlush.size() > 1) { streamsToFlush.poll().close(); } // 当前事务日志文件缓冲刷新到磁盘 if (txnLogSizeLimit > 0) { long logSize = getCurrentLogSize(); if (logSize > txnLogSizeLimit) { LOG.debug("Log size limit reached: {}", logSize); rollLog(); } }flush
// 没有等待要刷新到磁盘的事务请求--不处理 if (this.toFlush.isEmpty()) { return; } ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); // 当前时间 long flushStartTime = Time.currentElapsedTime(); // 刷新缓冲到磁盘 zks.getZKDatabase().commit(); ServerMetrics.getMetrics(). SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); // 一般下一级为Ack if (this.nextProcessor == null) { this.toFlush.clear(); } else { // 由于积压的事务请求被顺序写入了磁盘 // 所以积压的事务请求现在可以顺序投递给下一级处理【持久性】 while (!this.toFlush.isEmpty()) { final Request i = this.toFlush.remove(); long latency = Time.currentElapsedTime() - i.syncQueueStartTime; ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); // 事务请求被写入磁盘后,才投递给下一级继续处理 this.nextProcessor.processRequest(i); } // 下一级支持flush的话,也flush if (this.nextProcessor instanceof Flushable) { ((Flushable) this.nextProcessor).flush(); } } lastFlushTime = Time.currentElapsedTime();shouldFlush
// 取决于时间&积压待刷新事务请求数量 long flushDelay = zks.getFlushDelay(); long maxBatchSize = zks.getMaxBatchSize(); if ((flushDelay > 0) && (getRemainingDelay() == 0)) { return true; } return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);AckRequestProcessor
QuorumPeer self = leader.self; if (self != null) { request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY); // leader--受到来自主节点,关于zxid的ack leader.processAck(self.getId(), request.zxid, null); } else { LOG.error("Null QuorumPeer"); }processAck
if (!allowedToCommit) { return; } if (LOG.isTraceEnabled()) { LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid)); for (Proposal p : outstandingProposals.values()) { long packetZxid = p.packet.getZxid(); LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid)); } LOG.trace("outstanding proposals all"); } // 主节点首个包 if ((zxid & 0xffffffffL) == 0) { return; } if (outstandingProposals.size() == 0) { LOG.debug("outstanding is 0"); return; } if (lastCommitted >= zxid) { LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid)); return; } // 取得此zxid关联的待处理提议对象 Proposal p = outstandingProposals.get(zxid); if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; } if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) { p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid)); } // 提议对象中收集对此提议进行ack的成员 p.addAck(sid); // 尝试提交 boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // 重新配置暂不考虑 if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) { long curZxid = zxid; while (allowedToCommit && hasCommitted && p != null) { curZxid++; p = outstandingProposals.get(curZxid); if (p != null) { hasCommitted = tryToCommit(p, curZxid, null); } } }tryToCommit
// 上一zxid还未提交,处于顺序性要求本zxid也不可提交 if (outstandingProposals.containsKey(zxid - 1)) { return false; } // 提议未收到过半投票成员ack时,不可提交 if (!p.hasAllQuorums()) { return false; } if (zxid != lastCommitted + 1) { LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr); LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1)); } // 从待处理提议集合中移除zxid关联的 outstandingProposals.remove(zxid); if (p.request != null) { // 将提议放到toBeApplied容器 toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commit null: {}", p); } // 重新配置暂不考虑 else if (p.request.getHdr().getType() == OpCode.reconfig) { LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size()); Long designatedLeader = getDesignatedLeader(p, zxid); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { LOG.info(String.format("Committing a reconfiguration (reconfigEnabled=%s); this leader is not the designated " + "leader anymore, setting allowedToCommit=false", self.isReconfigEnabled())); allowedToCommit = false; } commitAndActivate(zxid, designatedLeader); informAndActivate(p, designatedLeader); } else { p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY); commit(zxid); synchronized (this) { lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); // 给每个从节点,观察者发送类型为Leader.COMMIT,zxid为zxid的集群包 sendPacket(qp); ServerMetrics.getMetrics().COMMIT_COUNT.add(1); inform(p); // 用于给观察者发送提交通知 QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null); sendObserverPacket(qp); } // 主节点对每个写入请求都需要向自己&各个参与者发送提议 // 收集过半自己&参与者的ack后, // 自己&通知参与者进入请求提交 // 向commitProcessor加入请求 // commitProcessor负责对加入其中的请求逐个执行请求提交 *** 作 zk.commitProcessor.commit(p.request); // 如果有人之前发送了针对此zxid的同步请求 if (pendingSyncs.containsKey(zxid)) { for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) { // 因为此zxid此刻进入提交处理了,所以向等待在此zxid的同步者发送SYNC sendSync(r); QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null); r.fh.queuePacket(qp); } } return true;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)