zookeeper源码解析--请求处理--ProposalRequestProcessor

zookeeper源码解析--请求处理--ProposalRequestProcessor,第1张

zookeeper源码解析--请求处理--ProposalRequestProcessor ProposalRequestProcessor

SyncRequestProcessor
AckRequestProcessor

// 如果请求是LearnerSyncRequest实例对象
if (request instanceof LearnerSyncRequest) 
{
	// 主节点执行processSync
    zks.getLeader().processSync((LearnerSyncRequest) request);
    	// 没有等待完成的提议
    	if (outstandingProposals.isEmpty()) 
        {
            sendSync(r);
            	QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
    			r.fh.queuePacket(qp);
        } 
        else 
        {
        	// 尚有等待完成的提议
        	// 最后的提议,阻塞等待的同步请求集合
            pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r);
        }
} 
else 
{
	
    if (shouldForwardTonextProcessor(request)) 
    {
    	// 对于直接用户,直接交由下一阶段处理
        nextProcessor.processRequest(request);
    }
    
    // 请求包含事务头--修改类请求
    if (request.getHdr() != null) 
    {
        try 
        {
        	// 通过主节点进行提议
            zks.getLeader().propose(request);
        }
        catch (XidRolloverException e) 
 		{
    		throw new RequestProcessorException(e.getMessage(), e);
		}
               	
        // 提议后,进行请求同步
        syncProcessor.processRequest(request);
    }
}
提议
if (request.isThrottled()) 
{
    LOG.error("Throttled request send as proposal: {}. Exiting.", request);
    ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}

if ((request.zxid & 0xffffffffL) == 0xffffffffL) 
{
    String msg = 
    	"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
    shutdown(msg);
    throw new XidRolloverException(msg);
}

// 请求对象序列化
byte[] data = SerializeUtils.serializeRequest(request);
// 提议状态信息
proposalStats.setLastBufferSize(data.length);
// 数据包类别:Leader.PROPOSAL
// 数据包zxid:
// 数据包数据:
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
// 提议对象
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) 
{
	// 提议对象还包含自身集群信息
    p.addQuorumVerifier(self.getQuorumVerifier());
    if (request.getHdr().getType() == OpCode.reconfig) 
    {
        self.setLastSeenQuorumVerifier(request.qv, true);
    }

    if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) 
    {
        p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    LOG.debug("Proposing:: {}", request);
    // 更新最后提议对象
    lastProposed = p.packet.getZxid();
    // 等待完成提议中新增lastProposed--提议对象
    outstandingProposals.put(lastProposed, p);
    // 向所有从节点/观察者发送提议
    sendPacket(pp);
}

	ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
	return p;
} 
SyncRequestProcessor
resetSnapshotStats();
	// 随机值
	randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
	// 不超过snapSizeInBytes/2的随机值
    randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
// 记录时间
lastFlushTime = Time.currentElapsedTime();
while (true) 
{
    ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
    // 超时时间
    long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
    // 取出请求
    Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
    if (si == null) 
    {
        flush();
        si = queuedRequests.take();
    }

    if (si == REQUEST_OF_DEATH) 
    {
        break;
    }
	
	// 当前时间
    long startProcessTime = Time.currentElapsedTime();
    ServerMetrics.
    	getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
    // 数据实体执行append
    if (!si.isThrottled() && zks.getZKDatabase().append(si)) 
    {
    	// 如果事务日志累计到此刻,适合对数据实体执行快照
        if (shouldSnapshot()) 
        {
        	// 重置快照状态
            resetSnapshotStats();
            // 将当前事务日志文件进行缓冲刷新到磁盘 *** 作
            zks.getZKDatabase().rollLog();
            if (!snapThreadMutex.tryAcquire()) 
            {
                LOG.warn("Too busy to snap, skipping");
            } 
            else 
            {
                new ZooKeeperThread("Snapshot Thread") 
                {
                    public void run() 
                    {
                        try 
                        {
                        	// 执行快照生产,过程受到snapThreadMutex保护
                            zks.takeSnapshot();
                        } 
                        catch (Exception e) 
                        {
                            LOG.warn("Unexpected exception", e);
                        } 
                        finally 
                        {
                            snapThreadMutex.release();
                        }
                    }
                }.start();
            }
        }
    } 
    // 没有事务请求等待刷新到磁盘,且本请求遭遇过滤
    else if (toFlush.isEmpty()) 
    {
        if (nextProcessor != null) 
        {
            nextProcessor.processRequest(si);
            if (nextProcessor instanceof Flushable) 
            {
                ((Flushable) nextProcessor).flush();
            }
        }

        continue;
    }
    
    // 这个请求属于尚未刷新到磁盘的事务请求
    toFlush.add(si);
    // 如果此时应该刷新,刷新事务日志文件缓冲到磁盘
    if (shouldFlush()) 
    {
        flush();
    }
    
    ServerMetrics.getMetrics().
    	SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
append
txnCount.incrementAndGet();
return this.snapLog.append(si);
	txnLog.append(si.getHdr(), si.getTxn(), si.getTxnDigest());
		// 只有写请求有事务日志
		if (hdr == null) 
        {
            return false;
        }
        
        if (hdr.getZxid() <= lastZxidSeen) 
        {
            LOG.warn(
            	"Current zxid {} is <= {} for {}", hdr.getZxid(), lastZxidSeen, Request.op2String(hdr.getType()));
        } 
        else 
        {
            lastZxidSeen = hdr.getZxid();
        }

        if (logStream == null) 
        {
            LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));
            // 新的事务日志文件
            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
        	
        	// 新文件文件头
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
            fhdr.serialize(oa, "fileheader");
            logStream.flush();
            // 设置好文件大小
            filePadding.setCurrentSize(fos.getChannel().position());
            // 当前事务日志文件,写入内容不保证立即刷新到磁盘
            streamsToFlush.add(fos);
        }

        filePadding.padFile(fos.getChannel());
        // 序列化
        byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
        if (buf == null || buf.length == 0) 
        {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        // 写入crc
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        // 写入事务本身
        Util.writeTxnBytes(oa, buf);
        return true;
commit
this.snapLog.commit();
	txnLog.commit();
		// 如果当前事务日志文件存在,刷新 文件缓冲到磁盘
		if (logStream != null) 
		{
		       logStream.flush();
		}
		   
		// 对需要刷新的每个对象
		for (FileOutputStream log : streamsToFlush) 
		{
		   	   // 刷新缓冲到磁盘
		       log.flush();
		       // 取决于环境变量
		       if (forceSync) 
		       {
		       	   // 当前时间
		           long startSyncNS = System.nanoTime();
		           FileChannel channel = log.getChannel();
		           // 强制刷新文件内容
		           channel.force(false);
		           syncElapsedMS 
		           	= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
		           if (syncElapsedMS > fsyncWarningThresholdMS) 
		           {
		               if (serverStats != null) 
		               {
		                   serverStats.incrementFsyncThresholdExceedCount();
		               }
		
		               LOG.warn("fsync-ing the write ahead log in {} took {}ms 
		               	which will adversely effect operation latency." + "File size is {} bytes. 
		               	See the ZooKeeper troubleshooting guide", Thread.currentThread().getName(), 
		               	syncElapsedMS, channel.size());
		           }
		
		           ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS);
		       }
		}
		
		// 刷新后事务日志文件进行关闭
		while (streamsToFlush.size() > 1) 
		{
		       streamsToFlush.poll().close();
		}
		
		// 当前事务日志文件缓冲刷新到磁盘
		if (txnLogSizeLimit > 0) 
		{
		       long logSize = getCurrentLogSize();
		       if (logSize > txnLogSizeLimit) 
		       {
		           LOG.debug("Log size limit reached: {}", logSize);
		           rollLog();
		       }
		}
flush
// 没有等待要刷新到磁盘的事务请求--不处理
if (this.toFlush.isEmpty()) 
{
    return;
}

ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
// 当前时间
long flushStartTime = Time.currentElapsedTime();
// 刷新缓冲到磁盘
zks.getZKDatabase().commit();
ServerMetrics.getMetrics().
	SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
// 一般下一级为Ack
if (this.nextProcessor == null) 
{
    this.toFlush.clear();
} 
else 
{
	// 由于积压的事务请求被顺序写入了磁盘
	// 所以积压的事务请求现在可以顺序投递给下一级处理【持久性】
    while (!this.toFlush.isEmpty()) 
    {
        final Request i = this.toFlush.remove();
        long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
        // 事务请求被写入磁盘后,才投递给下一级继续处理
        this.nextProcessor.processRequest(i);
    }
    
    // 下一级支持flush的话,也flush
    if (this.nextProcessor instanceof Flushable) 
    {
        ((Flushable) this.nextProcessor).flush();
    }
}

lastFlushTime = Time.currentElapsedTime();
shouldFlush
// 取决于时间&积压待刷新事务请求数量
long flushDelay = zks.getFlushDelay();
long maxBatchSize = zks.getMaxBatchSize();
if ((flushDelay > 0) && (getRemainingDelay() == 0)) 
{
    return true;
}

return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
AckRequestProcessor
QuorumPeer self = leader.self;
if (self != null) 
 {
     request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
     // leader--受到来自主节点,关于zxid的ack
     leader.processAck(self.getId(), request.zxid, null);
 } 
 else 
 {
     LOG.error("Null QuorumPeer");
 }
processAck
if (!allowedToCommit) 
{
    return; 
}

if (LOG.isTraceEnabled()) 
{
    LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
    for (Proposal p : outstandingProposals.values()) 
    {
        long packetZxid = p.packet.getZxid();
        LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
    }
    
    LOG.trace("outstanding proposals all");
}

// 主节点首个包
if ((zxid & 0xffffffffL) == 0) 
{
    return;
}

if (outstandingProposals.size() == 0) 
{
    LOG.debug("outstanding is 0");
    return;
}

if (lastCommitted >= zxid) 
{
    LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", 
    	Long.toHexString(lastCommitted), Long.toHexString(zxid));
    return;
}
// 取得此zxid关联的待处理提议对象
Proposal p = outstandingProposals.get(zxid);
if (p == null) 
{
    LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", 
    	Long.toHexString(zxid), followerAddr);
    return;
}

if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) 
{
    p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
}

// 提议对象中收集对此提议进行ack的成员
p.addAck(sid);
// 尝试提交
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
// 重新配置暂不考虑
if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) 
{
    long curZxid = zxid;
    while (allowedToCommit && hasCommitted && p != null) 
    {
        curZxid++;
        p = outstandingProposals.get(curZxid);
        if (p != null) 
        {
            hasCommitted = tryToCommit(p, curZxid, null);
        }
    }
}
tryToCommit
// 上一zxid还未提交,处于顺序性要求本zxid也不可提交
if (outstandingProposals.containsKey(zxid - 1)) 
{
    return false;
}

// 提议未收到过半投票成员ack时,不可提交
if (!p.hasAllQuorums()) 
{
    return false;
}

if (zxid != lastCommitted + 1) 
{
    LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
    LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}

// 从待处理提议集合中移除zxid关联的
outstandingProposals.remove(zxid);
if (p.request != null) 
{
    // 将提议放到toBeApplied容器
    toBeApplied.add(p);
}

if (p.request == null) 
{
    LOG.warn("Going to commit null: {}", p);
} 
// 重新配置暂不考虑
else if (p.request.getHdr().getType() == OpCode.reconfig) 
{
    LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size());
    Long designatedLeader = getDesignatedLeader(p, zxid);
    QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier();
    self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
    if (designatedLeader != self.getId()) 
    {
        LOG.info(String.format("Committing a reconfiguration (reconfigEnabled=%s); 
        	this leader is not the designated " + "leader anymore, setting allowedToCommit=false", 
        	self.isReconfigEnabled()));
        allowedToCommit = false;
    }

    commitAndActivate(zxid, designatedLeader);
    informAndActivate(p, designatedLeader);
} 
else 
{
    p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
    commit(zxid);
    	synchronized (this) 
        {
            lastCommitted = zxid;
        }
        
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        // 给每个从节点,观察者发送类型为Leader.COMMIT,zxid为zxid的集群包
        sendPacket(qp);
        ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
    inform(p);
    	// 用于给观察者发送提交通知
    	QuorumPacket qp = 
    		new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
    	sendObserverPacket(qp);
}

// 主节点对每个写入请求都需要向自己&各个参与者发送提议
// 收集过半自己&参与者的ack后,
// 自己&通知参与者进入请求提交
// 向commitProcessor加入请求
// commitProcessor负责对加入其中的请求逐个执行请求提交 *** 作
zk.commitProcessor.commit(p.request);
// 如果有人之前发送了针对此zxid的同步请求
if (pendingSyncs.containsKey(zxid)) 
{
    for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) 
    {
        // 因为此zxid此刻进入提交处理了,所以向等待在此zxid的同步者发送SYNC
        sendSync(r);
        	QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
    		r.fh.queuePacket(qp);
    }
}

return true;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5708953.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存