vote /notification:投票信息,注意vote和notification不一样,但是下面都用vote泛指,因为它们包含的主要内容一样:
id;表示该vote是谁投的 state ;投出这个vote的节点的状态。 proposedLeader ; 表示该vote给谁投(目标节点)。 proposedZxid;该节点【内存】dataTree上最大的zxid proposedEpoch ;该节点【磁盘】上保存的currentEpoch electionEpoch ;该节点【内存】中,每次节点准备选举leader,就会增加electionEpoch
recvset (类型:Map
该map可以保存同一个electionEpoch的来自任意状态(LOOKING/FOLLOWER/LEADER)节点的vote。如果当前节点收到一个拥有更大electionEpoch的vote,则会更新自己的electionEpoch。
outofelection(类型:Map
不论当前节点的vote的electionEpoch和其收到的来自follower/leader节点的vote的electionEpoch是否相同,都会将其保存到outofelection; 相比之下,recvset只会保存同一个electionEpoch的vote。 使用outofelection的目的是防止当前节点由于某种原因进入LOOKING状态,然后更新了electionEpoch,但此时集群中已经有quorum的节点处于follower/leadeer状态,即集群此时已经选举出新的leadr,由于当前节点的electionEpoch比集群中quorum的节点的都大;此时recvset中保存的vote不可能满足quorum的节点的vote一样,仅使用recvset不能选举出leader,因此这里使用了outofelection。
recvqueue(类型:双端队列):保存接受到的来自其他节点的vote。
logicalclock(类型:AtomicLong):保存当前节点的electionEpoch,每次开始leader选举都会首先自增该变量。
2.关键类和方法 FastLeaderElection.java//这些方法都在FastLeaderElection.java中,zk版本3.7.0,这里只说明重要的部分;对于源码中原有的英文注释不删除,供参考。 public Vote lookForLeader() throws InterruptedException { // ...... //在下面的源码分析中,不区分vote和notification对象,都当作vote对待,不影响FLE算法的主要思想理解。 try { //用来保存收到的来自同一个logicalclock/electionEpoch的节点的vote;logicalclock和electionEpoch是同样的东西,只不过一个在当前节点内存中,一个是在vote中。 Map3.总结 1.主要流程recvset = new HashMap (); Map outofelection = new HashMap (); //超时时间:默认为200ms,作用见下。 int notTimeout = minNotificationInterval; //初始化该节点的vote:注意此处不论是notification还是vote对象,一律使用vote代替,不影响算法的本质思路。 // id = self.sid;表示该vote是谁投的 // state = LOOKING // proposedLeader = self.sid; 表示该vote给谁投 // proposedZxid = self.lastProcessedZxid;该节点【内存】dataTree上最大的zxid // proposedEpoch = self.currentEpoch;该节点【磁盘】上保存的currentEpoch // electionEpoch = logicalclock++;每次节点准备选举leader,就会增加electionEpoch synchronized (this) { logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // ...... //将该节点的vote发送给其他节点 sendNotifications(); //...... //循环直到当前节点选出了一个leader while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { //recvqueue队列保存从其他节点收到的votes //d出队列中的一个来自其他节点的vote Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //如果超时了了,还没有收到vote。 if (n == null) { //不重要,主要想判断是不是和其他节点的网络连接有问题,导致超时了还没有收到其他节点的vote。 if (manager.haveDelivered()) {//没问题,则继续发送该节点的vote给其他节点 sendNotifications(); } else {//有问题,则重新建立连接 manager.connectAll(); } //将超时时间扩大2倍,当然不能超过上界maxNotificationInterval = 60s int tmpTimeOut = notTimeout * 2; notTimeout = Math.min(tmpTimeOut, maxNotificationInterval); LOG.info("Notification time out: {}", notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) {//如果收到了来自其他节点的有效vote。 //n为该节点收到的其他节点的vote。 switch (n.state) { //如果vote n是由LOOKING状态的节点发出的,这些vote如果比当前节点的vote更好,当前节点会更新自己的vote;两张vote的更好是通过totalOrderPredicate()方法,比较的。详细见totalOrderPredicate分析。 case LOOKING: // ...... // If notification > current, replace and send messages out //如果vote n中的electionEpoch > 该节点的vote的electionEpoch if (n.electionEpoch > logicalclock.get()) { //更新当前节点的electionEpoch logicalclock.set(n.electionEpoch); //清空recvset,因为recvset保存的都是同一个electionepoch的vote。 recvset.clear(); //如果vote n比当前节点的vote更好,则更新当前节点的vote = n。 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //当前节点广播新的vote。 sendNotifications(); } //如果vote n中的electionEpoch < 该节点的vote的electionEpoch,则不做处理。 else if (n.electionEpoch < logicalclock.get()) { // ...... //退出switch break; } //如果vote n的electionEpoch == 当前节点的electionEpoch,并且n比当前节点的vote更好,则更新当前节点的vote,并广播。 else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } // ...... // don't care about the version if it's in LOOKING state //将vote n保存到recvset中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); //判断recvset中,是否有quorum的节点和当前节点都投给同一个leader。 if (voteSet.hasAllQuorums()) { // Verify if there is any change in the proposed leader // //如果有quorum的节点都和当前节点投了同一个leader,那么再等finalizeWait = 200毫秒,判断是否有比当前节点的vote更好的vote。 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { //如果有,退出switch。 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //重新放入recvqueue队列,使得下次循环可以继续取到该vote。 recvqueue.put(n); break; } } //如果等待200毫秒都没要更好的vote,那么该节点就将其当前vote中的proposedLeader设置为它的leader;只要当前节点确认了新的leader,就会使用下述同样的步骤。 if (n == null) { //判断当前节点是不是就是被选出来的leader。不是则将其状态设置为follower setPeerState(proposedLeader, voteSet); //设置当前节点最终的vote Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); //返回当前节点最终的vote,退出FLE return endVote; } } break; //如果收到的是来自OBSERVING节点的vote,则直接退出switch,因为在FLE中observer节点不能投票。 case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; //如收到得是来自follower/leader状态的节点发出的vote case FOLLOWING: case LEADING: //此时只考虑vote n的electionEpoch和当前节点的electionEpoch一样的情况 if (n.electionEpoch == logicalclock.get()) { //把vote n保存到recvset中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); //已经有quorum的节点都投票给vote n中的n.leader,并且该leader确实处于leader状态 if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) { //和之前一样 setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } //不论当前节点的vote的electionEpoch和n的electionEpoch是否相同,都会将其保存到outofelection,即当前leader选举和之前leader选举的选票都会保存到outofelection中 //相比之下,recvset只会保存同一个electionEpoch的vote。 //使用outofelection的目的是防止LOOKING状态的该节点的electionEpoch由于某种原因,进入了新的选举时期electionEpoch,即它的electionEpoch比集群中quorum的节点的都大 //此时recvset中的vote不可能满足quorum的节点的vote一样,仅使用recvset不能选举出leader,因此这里使用了outofelection。 outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); //已经有quorum的节点都投票给vote n中的n.leader,并且该leader确实处于leader状态 if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { //更新新的electionEpoch为当前leader的electionEpoch logicalclock.set(n.electionEpoch); //后面和之前一样 setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } //...... } //该方法用来在leader选举中比较2个vote中投的候选者节点哪个更好。 protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { // ...... //首先比较2个候选者的currentEpoch,currentEpoch大的更好。 //上述一样的情况下,比较2个候选者的lastProposedZxid,lastProposedZxid大的更好。 //上述一样的情况下,比较2个候选者的sid,sid大的更好。 return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
FLE leader选举阶段的主要步骤是:
1.由于任意原因,某节点进入LOOKING状态,自增其electionEpoch,将vote首先投给自己,然后广播该vote 。
2.等待来自其他节点的vote,如果超时时间到了还没有收到来自其他节点的vote,判断是否因为网络连接有问题,有问题则重新建立连接,然后将超时时间扩大2倍。
3.任意节点N收到了vote x,如果节点N的状态是LOOKING
(1)如果vote x是由状态LOOKING的节点发出:
保证recvset中保存的vote的electionEpoch都一样,如果vote x的electionEpoch > 节点N此时的electionEpoch,则清空recvset。
使用候选者比较规则,比较2个vote中的候选者哪个更好,如果vote x更好,则更新自己的vote = x。
节点N重新广播其选票信息。
vote x的electionEpoch只要不小于节点N的electionEpoch则将**vote x保存到recvset**中。
判断在recvset中是否有quorum的节点都和N投给同一个候选者,如果是,则节点N得到一个leader,选举结束并更新节点N的最终选票endVote。
(2)如果vote x是由状态FOLLOWER/LEAEDER的节点发出:
比较vote x的electionEpoch是否和N的electionEpoch相同,相同则将vote x保存到recvset中,并判断在recvset中是否有quorum的节点都和vote x投给同一个候选者,如果是,则节点N得到一个leader,选举结束。
将vote x保存到outofelection,并判断在outofelection中是否有quorum的节点都和vote x投给同一个候选者,如果是,则节点N得到一个leader,选举结束并更新节点N的最终选票endVote。
4.任意节点N收到了vote x,如果节点N的状态不是LOOKING,发送x的节点的状态是LOOKING,那么节点N广播它的endVote最终选票。
2.特点 1.候选者比较规则首先比较2个候选者的currentEpoch,currentEpoch大的更好。
上述一样的情况下,比较2个候选者的lastProposedZxid,lastProposedZxid大的更好。
上述一样的情况下,比较2个候选者的sid,sid大的更好。
2.quorum机制 4.其他对于FLE选举算法的具体特点要结合FLE数据同步阶段,论文中叫做Recovery阶段
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)