zookeeper Fast Leader Election(FLE)源码分析

zookeeper Fast Leader Election(FLE)源码分析,第1张

zookeeper Fast Leader Election(FLE)源码分析 zookeeper Fast Leader Election(FLE) leader选举阶段源码分析 1.关键对象

vote /notification:投票信息,注意vote和notification不一样,但是下面都用vote泛指,因为它们包含的主要内容一样:

id;表示该vote是谁投的
state ;投出这个vote的节点的状态。
proposedLeader ; 表示该vote给谁投(目标节点)。
proposedZxid;该节点【内存】dataTree上最大的zxid
proposedEpoch ;该节点【磁盘】上保存的currentEpoch
electionEpoch ;该节点【内存】中,每次节点准备选举leader,就会增加electionEpoch

recvset (类型:Map):用来保存同一个electionEpoch的vote。

该map可以保存同一个electionEpoch的来自任意状态(LOOKING/FOLLOWER/LEADER)节点的vote。如果当前节点收到一个拥有更大electionEpoch的vote,则会更新自己的electionEpoch。

outofelection(类型:Map):用来保存来自follower/leader节点的vote,该map中保存的vote的electionEpoch可以不同。

不论当前节点的vote的electionEpoch和其收到的来自follower/leader节点的vote的electionEpoch是否相同,都会将其保存到outofelection;
相比之下,recvset只会保存同一个electionEpoch的vote。
使用outofelection的目的是防止当前节点由于某种原因进入LOOKING状态,然后更新了electionEpoch,但此时集群中已经有quorum的节点处于follower/leadeer状态,即集群此时已经选举出新的leadr,由于当前节点的electionEpoch比集群中quorum的节点的都大;此时recvset中保存的vote不可能满足quorum的节点的vote一样,仅使用recvset不能选举出leader,因此这里使用了outofelection。

recvqueue(类型:双端队列):保存接受到的来自其他节点的vote。

logicalclock(类型:AtomicLong):保存当前节点的electionEpoch,每次开始leader选举都会首先自增该变量。

2.关键类和方法 FastLeaderElection.java
//这些方法都在FastLeaderElection.java中,zk版本3.7.0,这里只说明重要的部分;对于源码中原有的英文注释不删除,供参考。

public Vote lookForLeader() throws InterruptedException {
    // ......
    //在下面的源码分析中,不区分vote和notification对象,都当作vote对待,不影响FLE算法的主要思想理解。
    try {
        
        //用来保存收到的来自同一个logicalclock/electionEpoch的节点的vote;logicalclock和electionEpoch是同样的东西,只不过一个在当前节点内存中,一个是在vote中。
        Map recvset = new HashMap();

        
        
        Map outofelection = new HashMap();
		//超时时间:默认为200ms,作用见下。
        int notTimeout = minNotificationInterval;

        //初始化该节点的vote:注意此处不论是notification还是vote对象,一律使用vote代替,不影响算法的本质思路。
        //                  id = self.sid;表示该vote是谁投的
        //                  state = LOOKING
        //                  proposedLeader = self.sid; 表示该vote给谁投
        //                  proposedZxid = self.lastProcessedZxid;该节点【内存】dataTree上最大的zxid
        //                  proposedEpoch = self.currentEpoch;该节点【磁盘】上保存的currentEpoch
        //                  electionEpoch = logicalclock++;每次节点准备选举leader,就会增加electionEpoch
        synchronized (this) {
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        
        // ...... 
        
        //将该节点的vote发送给其他节点
        sendNotifications();
	   //......
        
        //循环直到当前节点选出了一个leader
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            
            //recvqueue队列保存从其他节点收到的votes
            //d出队列中的一个来自其他节点的vote
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            
            //如果超时了了,还没有收到vote。
            if (n == null) {
                //不重要,主要想判断是不是和其他节点的网络连接有问题,导致超时了还没有收到其他节点的vote。
                if (manager.haveDelivered()) {//没问题,则继续发送该节点的vote给其他节点
                    sendNotifications();
                } else {//有问题,则重新建立连接
                    manager.connectAll();
                }
                
                //将超时时间扩大2倍,当然不能超过上界maxNotificationInterval = 60s
                int tmpTimeOut = notTimeout * 2;
                notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                LOG.info("Notification time out: {}", notTimeout);
            } else if (validVoter(n.sid) && validVoter(n.leader)) {//如果收到了来自其他节点的有效vote。
                
                //n为该节点收到的其他节点的vote。
                switch (n.state) {
                    //如果vote n是由LOOKING状态的节点发出的,这些vote如果比当前节点的vote更好,当前节点会更新自己的vote;两张vote的更好是通过totalOrderPredicate()方法,比较的。详细见totalOrderPredicate分析。
                case LOOKING:
                    // ......
                    // If notification > current, replace and send messages out
                    //如果vote n中的electionEpoch > 该节点的vote的electionEpoch
                    if (n.electionEpoch > logicalclock.get()) {
                        //更新当前节点的electionEpoch
                        logicalclock.set(n.electionEpoch);
                        //清空recvset,因为recvset保存的都是同一个electionepoch的vote。
                        recvset.clear();
                        //如果vote n比当前节点的vote更好,则更新当前节点的vote = n。
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        //当前节点广播新的vote。
                        sendNotifications();
                    } 
                    //如果vote n中的electionEpoch < 该节点的vote的electionEpoch,则不做处理。
                    else if (n.electionEpoch < logicalclock.get()) {
                       // ...... 
                       //退出switch
                        break; 
                    } 
                    //如果vote n的electionEpoch == 当前节点的electionEpoch,并且n比当前节点的vote更好,则更新当前节点的vote,并广播。
                    else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }
                 	// ......
                    // don't care about the version if it's in LOOKING state
                    //将vote n保存到recvset中
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                    //判断recvset中,是否有quorum的节点和当前节点都投给同一个leader。
                    if (voteSet.hasAllQuorums()) {
                        // Verify if there is any change in the proposed leader
                        //
                        //如果有quorum的节点都和当前节点投了同一个leader,那么再等finalizeWait = 200毫秒,判断是否有比当前节点的vote更好的vote。
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            //如果有,退出switch。
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                //重新放入recvqueue队列,使得下次循环可以继续取到该vote。
                                recvqueue.put(n);
                                break;
                            }
                        }

                        
                        //如果等待200毫秒都没要更好的vote,那么该节点就将其当前vote中的proposedLeader设置为它的leader;只要当前节点确认了新的leader,就会使用下述同样的步骤。
                        if (n == null) {
                            //判断当前节点是不是就是被选出来的leader。不是则将其状态设置为follower
                            setPeerState(proposedLeader, voteSet);
                            //设置当前节点最终的vote
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);			
                            leaveInstance(endVote);
                            //返回当前节点最终的vote,退出FLE
                            return endVote;
                        }
                    }
                    break;
                    //如果收到的是来自OBSERVING节点的vote,则直接退出switch,因为在FLE中observer节点不能投票。
                case OBSERVING:
                    LOG.debug("Notification from observer: {}", n.sid);
                    break;
                    //如收到得是来自follower/leader状态的节点发出的vote
                case FOLLOWING:
                case LEADING:
                    

                    //此时只考虑vote n的electionEpoch和当前节点的electionEpoch一样的情况
                    if (n.electionEpoch == logicalclock.get()) {
                        //把vote n保存到recvset中
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        //已经有quorum的节点都投票给vote n中的n.leader,并且该leader确实处于leader状态
                        if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                            //和之前一样
                            setPeerState(n.leader, voteSet);
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    
                    //不论当前节点的vote的electionEpoch和n的electionEpoch是否相同,都会将其保存到outofelection,即当前leader选举和之前leader选举的选票都会保存到outofelection中
                    //相比之下,recvset只会保存同一个electionEpoch的vote。
                    //使用outofelection的目的是防止LOOKING状态的该节点的electionEpoch由于某种原因,进入了新的选举时期electionEpoch,即它的electionEpoch比集群中quorum的节点的都大
                    //此时recvset中的vote不可能满足quorum的节点的vote一样,仅使用recvset不能选举出leader,因此这里使用了outofelection。
                    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                    voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                    //已经有quorum的节点都投票给vote n中的n.leader,并且该leader确实处于leader状态
                    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                        synchronized (this) {
                            //更新新的electionEpoch为当前leader的electionEpoch
                            logicalclock.set(n.electionEpoch);
                            //后面和之前一样
                            setPeerState(n.leader, voteSet);
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } 
    //......
}

//该方法用来在leader选举中比较2个vote中投的候选者节点哪个更好。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        // ......

        
		//首先比较2个候选者的currentEpoch,currentEpoch大的更好。
    	//上述一样的情况下,比较2个候选者的lastProposedZxid,lastProposedZxid大的更好。
    	//上述一样的情况下,比较2个候选者的sid,sid大的更好。
        return ((newEpoch > curEpoch)
                || ((newEpoch == curEpoch)
                    && ((newZxid > curZxid)
                        || ((newZxid == curZxid)
                            && (newId > curId)))));
    }
3.总结 1.主要流程

FLE leader选举阶段的主要步骤是:

1.由于任意原因,某节点进入LOOKING状态,自增其electionEpoch,将vote首先投给自己,然后广播该vote 。

2.等待来自其他节点的vote,如果超时时间到了还没有收到来自其他节点的vote,判断是否因为网络连接有问题,有问题则重新建立连接,然后将超时时间扩大2倍。

3.任意节点N收到了vote x,如果节点N的状态是LOOKING

(1)如果vote x是由状态LOOKING的节点发出:

​ 保证recvset中保存的vote的electionEpoch都一样,如果vote x的electionEpoch > 节点N此时的electionEpoch,则清空recvset。

​ 使用候选者比较规则,比较2个vote中的候选者哪个更好,如果vote x更好,则更新自己的vote = x。

​ 节点N重新广播其选票信息。

 vote x的electionEpoch只要不小于节点N的electionEpoch则将**vote x保存到recvset**中。

​ 判断在recvset中是否有quorum的节点都和N投给同一个候选者,如果是,则节点N得到一个leader,选举结束并更新节点N的最终选票endVote。

(2)如果vote x是由状态FOLLOWER/LEAEDER的节点发出:

​ 比较vote x的electionEpoch是否和N的electionEpoch相同,相同则将vote x保存到recvset中,并判断在recvset中是否有quorum的节点都和vote x投给同一个候选者,如果是,则节点N得到一个leader,选举结束。

​ 将vote x保存到outofelection,并判断在outofelection中是否有quorum的节点都和vote x投给同一个候选者,如果是,则节点N得到一个leader,选举结束并更新节点N的最终选票endVote。

4.任意节点N收到了vote x,如果节点N的状态不是LOOKING,发送x的节点的状态是LOOKING,那么节点N广播它的endVote最终选票。

2.特点 1.候选者比较规则

首先比较2个候选者的currentEpoch,currentEpoch大的更好。

上述一样的情况下,比较2个候选者的lastProposedZxid,lastProposedZxid大的更好。

上述一样的情况下,比较2个候选者的sid,sid大的更好。

2.quorum机制 4.其他

对于FLE选举算法的具体特点要结合FLE数据同步阶段,论文中叫做Recovery阶段

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5696112.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存