Fabric Raft源码分析之leader选举

Fabric Raft源码分析之leader选举,第1张

Fabric Raft源码分析之leader选举 leader选举

fabric raft直接使用的etcd raft的仓库,所以以下很多实现均在etcd仓库的raft代码中。选举过程和论文中所描述的有所不同,在于它多了一个预选举流程。
为什么多了一个阶段呢?考虑这个场景,网络中产生了一个分区,在该分区中,有个candidate一直在选举,但是它选举肯定会失败,因为肯定没有超过半数的投票,但是它会一直重试,在重试的过程中会不断增加它自己的term值,当它重新加入正常集群中后,可能由于它的term值远远大于当前leader,会触发重新选举,但是它又没有最新的log导致选举失败,最终造成集群的瘫痪;加入预选举阶段,只有在预选举阶段收到了超过半数的投票才会允许其增加term值。如果当前进入candidate阶段后超时也会再次进入precandidate阶段的。

2.1 竞选阶段 2.1.2 超时选举

节点身份变为follower和candidate的时候,需要运行一个定时器Tick,这个Tick会选取一个随机超时时间,一旦当前leader达到超时时间,即在该随机时间内没有收到leader的消息时,就会发起选举。这在etcd源码包中etcd/raft/node.go中的(n *node) run(r *raft)函数中,该tick对于leader又不一样,在leader章节有介绍。

当触发选举时,follower/preCandidate/candidate会触发调用r.Step(pb.Message{From: r.id, Type: pb.MsgHup}),处理MsgHup消息类型,这在etcd/raft/raft.go中的(r *raft) Step(m pb.Message) error函数中:

  1. 拿到当前日志中applied+1到committed+1之间的日志,如果发现还有提交的日志没有apply到状态机的话,放弃此次选举,
  2. 否则区分进入预选举还是选举阶段,如果预选举开关开启(默认开启),那么就进入预选举阶段,所以无论在哪个阶段超时一定先进入预选举阶段。选举阶段由预选举阶段完成后触发。
case pb.MsgHup:
        if r.state != StateLeader {
            ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
            if err != nil {
                r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
            }
            if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
                r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
                return nil
            }

            r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
            if r.preVote {
                r.campaign(campaignPreElection)
            } else {
                r.campaign(campaignElection)
            }
        } else {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }

由上述描述可知,主要的是现在etcd/raft/raft.go中的campaign函数中:

  1. 根据传入的竞选阶段初始化不同的状态preCandiate/candidate,这里注意,如果是campaignTransfer类型,即leader转移,无需preCandidate阶段,直接进入投票阶段,因为经过transfer选中的节点都是leader根据规则选中的,是可信的。

  2. 判断当前投赞成的票数有多少,为什么这里就假设当前发出去的一个投票请求为true去计算呢?因为自己可以给自己投票。如果正好是quorum个,那么就进入真正的投票阶段,或者成为leader

  3. 接下来发送竞选消息,让其他节点为之投票,如果是campaignTransfer类型,需要告知,即携带该信息,在预选阶段,虽然以当前term+1的值发送消息,但是并不增加自己的term,只有选举阶段才会增加自己的term

func (r *raft) campaign(t CampaignType) {
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection {
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        term = r.Term + 1
    } else {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
        // We won the election after voting for ourselves (which must mean that
        // this is a single-node cluster). Advance to the next state.
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    for id := range r.prs {
        if id == r.id {
            continue
        }
        r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
            r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

        var ctx []byte
        if t == campaignTransfer {
            ctx = []byte(t)
        }
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

除此之外,当成为preCandidate/candidate之后,需要发送请求投票的消息,然后收到回复,回复的处理在raft.go的stepCandidate函数中:

r.poll(m.From, m.Type, !m.Reject)将该节点的投票结果记录进vote数组中,并计算当前共有多少偷了赞成票(gr),如果正好是quorum个节点,1. 在StatePreCandidate状态直接进入真正的选举流程,2. 否则直接成为leader,并立马发送一条append消息给所有followers告知其选举成功。如果发现反对票正好达到quorum个节点数,直接降低自己身份为follower。

case myVoteRespType:
        gr := r.poll(m.From, m.Type, !m.Reject)
        ...
        switch r.quorum() {
        case gr:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader()
                r.bcastAppend()
            }
            case len(r.votes) - gr:
                // pb.MsgPreVoteResp contains future term of pre-candidate
                // m.Term > r.Term; reuse r.Term
                r.becomeFollower(r.Term, None)
        }
2.1.2 leader transfer

当出现一下两种情况时,需要执行leader的转移:1. leader节点被删除;2. leader节点的配置变更(发生了旋转)

需要调用order/consensus/etcdraft/node.go中的abdicateLeader函数进行leader的转移,这个函数执行完leader转移后,还需要等待结果,直到成功转移leader或者节点停机或者超时才会退出。注意:该函数只有leader会执行,follower会直接退出,因为它没有权利转移leader,只能等待当前leader超时或者当前leader转移成功。由leader自己选取一个follower进行转移,选择的条件是pr.RecentActive&&!pr.Pause,即最近发送过消息,又没有发生执行append消息处理失败的节点。

最终实现在etcd/raft/node.go中的(n *node) run(r *raft)函数中,通过recv管道接收该类型的消息:

如果两次转移的follower一样(本次和上一次),那么将放弃此次的转移。限制此次转移只能在一个electionTimeout之内完成,否则将会放弃此次的转移(在tickHeartbeat中)。否则发消息告诉该节点,leader已经超时,该节点可以重新竞选leader。

case pb.MsgTransferLeader:
        if pr.IsLearner {
            r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
            return nil
        }
        leadTransferee := m.From
        lastLeadTransferee := r.leadTransferee
        if lastLeadTransferee != None {
            if lastLeadTransferee == leadTransferee {
                r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
                    r.id, r.Term, leadTransferee, leadTransferee)
                return nil
            }
            r.abortLeaderTransfer()
            r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
        }
        if leadTransferee == r.id {
            r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
            return nil
        }
        // Transfer leadership to third party.
        r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
        // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
        r.electionElapsed = 0
        r.leadTransferee = leadTransferee
        if pr.Match == r.raftLog.lastIndex() {
            r.sendTimeoutNow(leadTransferee)
            r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
        } else {
            r.sendAppend(leadTransferee)
        }

怎么检测到leader已经转移成功了呢?在node.go中的run(campaign bool)函数中,监听了在开始leader转移注册的一个管道n.subscriberC,当那里检测到本地状态中的leader已经不是None时,说明已经转移成功。

2.2 请求投票消息的处理

处理在etcd/raft/raft.go中,首先判断preVote/Vote消息的term是否比本地的大,是进行处理,否则,如果收到的消息term比本地小,只有对preVote消息返回response拒绝投票并携带本地term信息。这是一个细节,代码也只简单的写了个注释,说不回复的话会导致集群死锁,这使我思考了许久,因为我看到一旦收到leader的消息它就自动变为follower了啊,确实很细节,考虑以下场景:

现有 2 f + 1 2f+1 2f+1个节点,由raft原理可知,quorum数必须是 > = f + 1 >=f+1 >=f+1,现在由于网络原因,造成 f + 2 f+2 f+2的分区一和 f − 1 f-1 f−1的分区二两个分区,而分区一中一个节点node1又被分出去,一直在为自己选举,但是一直不成功,剩余节点还是可以正常共识的,当term增加后,分区一又有一节点宕机,导致该集群不work,且剩余的节点都在选举,这时候,node1又重新加入分区一,但是它的term是落后的,它还在为自己选举,但是如果其他节点都是不回复的话,它会一直重试,因为它只能得到自己的投票,假设剩余集群中是node2在选举,它发出投票请求,但是只能获得 f f f个赞成票,因为node1已经给自己投票了,这是无解的,因为没有条件让他俩退出当前选举状态重来,所以就死锁了。

还有一个问题:为什么只需要处理preVote消息呢?因为无论是preCandidate/candidate/follower超时都会首先进入preCandidate状态,所以只需要处理这一种消息类型就可。

解决完上述问题后,开始诉说处理流程,只有比本地term大或者相等的投票消息才会被处理。node中有一个Vote字段记录本轮投票阶段投给了谁,如果是preCandidate/candidate一直是自己,优先给自己投票。当投过票且不是给当前节点投的就会直接拒绝。当增长term后(更新状态,一个term的vote字段只会被赋值一次,不会被清空)后才会清空该字段。

那么问题来了,这里又是一个细节:在原始的raft中,当出现分票现象时,是通过random超时时间和增长的term解决的,但是由于preVote的存在,term不会增长,如果分票了怎么办?虽然是random超时时间,但是term不增长,vote字段就不会清空啊。其实在preVote阶段,并没有限制一人一票,即vote字段不会记录,一人可以投多票,只有当真正进入Vote阶段才会有一人一票的限制。

接下来看代码:

这是第一次筛选,如果不是leader transfer,需要判断一下,如果当前checkQuorum开关打开且leader存在且还没有超时,不理睬此次选举。如果是不用管当前是否有leader。

case m.Term > r.Term:
        if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
            force := bytes.Equal(m.Context, []byte(campaignTransfer))
            inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
            if !force && inLease {
                // If a server receives a RequestVote request within the minimum election timeout
                // of hearing from a current leader, it does not update its term or grant its vote
                r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
                    r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
                return nil
            }
        }

第二次筛选:

其实注释已经写的很明白了,哈哈,翻译一下,如果要我投赞成票,必须要满足以下条件之一:1. 本term没有投过票且还没有出现leader,或者已经给该节点投了赞成票且你的log更新或者一样;2. 如果是preVote只要你的term比我大且你的log更新或者一样,我就给你投赞成票。投完之后,如果是vote阶段,记录投过的票,

case pb.MsgVote, pb.MsgPreVote:
        ...... // 筛选节点类型
        // We can vote if this is a repeat of a vote we've already cast...
        canVote := r.Vote == m.From ||
            // ...we haven't voted and we don't think there's a leader yet in this term...
            (r.Vote == None && r.lead == None) ||
            // ...or this is a PreVote for a future term...
            (m.Type == pb.MsgPreVote && m.Term > r.Term)
        // ...and we believe the candidate is up to date.
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
            r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            // When responding to Msg{Pre,}Vote messages we include the term
            // from the message, not the local term. To see why consider the
            // case where a single node was previously partitioned away and
            // it's local term is now of date. If we include the local term
            // (recall that for pre-votes we don't update the local term), the
            // (pre-)campaigning node on the other end will proceed to ignore
            // the message (it ignores all out of date messages).
            // The term in the original message and current local term are the
            // same in the case of regular votes, but different for pre-votes.
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            if m.Type == pb.MsgVote {
                // only record real votes.
                r.electionElapsed = 0
                r.Vote = m.From
            }
        } else {
            r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
        }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5654760.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存