Raft算法

Raft算法,第1张

Raft算法 上篇回顾:Raft算法_SOFAJRaft源码学习_(一、背景及选主演示) 概览

Counter演示程序的构成,可以参考官方文档:

https://www.sofastack.tech/projects/sofa-jraft/counter-example/

CounterServer是主启动入口,进去以后就进行了相关的配置,最后调用了集群的start方法,启动集群:

// 启动
this.node = this.raftGroupService.start();

 启动后的大概流程如下:

代码详细分析 预投票箱 / 投票箱

当预投票或投票流程发起的时候,都会初始化一个投票箱:prevVoteCtx / voteCtx 都是 Ballot 类型的对象,主要字段是quorum(法定票数)

this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());

法定票数quorum在投票箱Ballot的初始化方法init()中生成,为集群主机数减半:

this.quorum = this.peers.size() / 2 + 1;

这里投票箱的 *** 作,和我们常识中的投票箱不太一样,我们常识中的投票箱,是初始一个空箱子,然后不断增加票数。这里的投票箱,是初始一个法定票数,每得到1票,则法定票数减去1,直到减到0为止,则认为投票表决通过。

投票箱有个grant方法,用于进行法定票数的自减:

当法定票数自减到0后,表示投票数量超过半数,则该投票箱达到投票通过的结果:

public boolean isGranted() {
    return this.quorum <= 0 && this.oldQuorum <= 0;
}
  选举计时器 electionTimer

集群中所有主机启动后,都会开启这个定时器,周期触发,时间一到,就发起投票流程,投自己一票,然后给集群其它主机发请求,要求对方回应自己的选举请求。

所以选举计时器是集群中比较关键的一个计时器,每个主机通过该计时器都有可能自荐为leader,防止集群无主的情况出现。作为follower,计时器的重置是收到leader的心跳请求触发的,直观的可以看看第一部分提到的raft算法演示动画:http://thesecretlivesofdata.com/raft/

 

选举计时器在NodeImpl初始化的时候定义,并在随后开启。

this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {

    @Override
    protected void onTrigger() {
        handleElectionTimeout();
    }

    @Override
    protected int adjustTimeout(final int timeoutMs) {
        return randomTimeout(timeoutMs);
    }
};
// Learner node will not trigger the election timer.
if (!isLearner()) {
    this.electionTimer.restart();
} else {
    LOG.info("Node {} is a learner, election timer is not started.", this.nodeId);
}

其中,handleElectionTimeout方法用于处理计时器到期后的动作;randomTimeout用于产生随机时间,这样集群中不同主机的计时器到期时间随机,可以防止所有主机在同一时间开始选举自己,然后都拿不到过半投票,又重新开始在同一时间发起选举,陷入死循环...,但是如果有的主机先开始发起选举,则很大概率可以获取大多数的选票,成为leader。

至于这个RepeatedTimer是个什么东西,我们看看其内容:

public Timer createTimer(final String name) {
    return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048);
}

可以看到,这个重复计数器是一个HashedWheelTimer,基于哈希轮算法,具体原理可参见:

netty中的定时机制HashedWheelTimer - 简书 

21 技巧篇:延迟任务处理神器之时间轮 HashedWheelTimer.md

总结:集群中每个主机启动时都会开启一个electionTimer,成为leader后会暂停自己的electionTimer,follower会通过接收leader的heartbeat请求,持续保持electionTimer计时,一旦leader挂掉或者网络故障,收不到leader的heartbeat请求的follower都会因为electionTimer的计时到期,触发选举自荐流程,准备选举自己为新的leader。

前面还留了个问题,为什么先发起投票,就能大概率成为leader呢?下面看集群中一个主机收到预投票/投票请求后的处理逻辑:

处理其它主机的预投票 / 投票请求

在开始源码之前,有两个比较重要的概念需要了解

1. term:leader任期,集群中一个新leader产生后,term值会自增1 

2. logIndex:日志索引,集群中的日志索引也是自增的,leader每次发起appendEntries(追加日志记录)请求的时候,日志内容会追加,日志索引会自增

当主机收到预投票请求后:

1. 请求的term比自己的term小,则返回reject,这种情况显然对方的数据没有自己的新,对方的领导任期都少了几轮。 

2. 请求的logIndex比自己的小,也返回reject,这种情况也是对方的数据没自己的新。

3. 其它情况,返回granted,认可对方的选举自荐。

当主机收到投票请求后,同样会执行上面的逻辑,但是额外的:

如果发现请求的term比自己大,则更新自己保存的term值,向请求者看齐,这样整个集群的term值很快会保持最新值。

上面的流程可以看到,选举流程分为preVote和vote的好处是,在preVote,集群中的其它主机并不更新自己的term值,只有当候选者真的获取到了集群过半选票之后,再在正式的vote中大家才更新自己term值。可以防止有的主机,和在任的leader通信中断,于是它不断发起自荐请求,每次term都加一,但是最终又不会获选,但是拉高了集群整体的term值,并导致在任的正常leader因为term值过低而stepDown。

有了preVote环节,即使有主机带着自增的term号发起自荐,但是因其logIndex不是最新的,有可能不会获得过半票数,自荐失败退出,于是集群的term号不会更新,保留原值。

发起投票和响应投票相关的代码:

com.alipay.sofa.jraft.core.NodeImpl#preVote //预投票
com.alipay.sofa.jraft.core.NodeImpl#electSelf //投票
com.alipay.sofa.jraft.core.NodeImpl#handlePreVoteRequest //处理预投票请求
com.alipay.sofa.jraft.core.NodeImpl#handleRequestVoteRequest //处理投票请求

preVote流程:

  • 初始化预投票箱
  • 对其它主机发起RPC请求:rpcService.preVote
  • 预投自己一票

当收到预投票数量过半时,开始投票流程

投票流程:

  • 初始化投票箱
  • 停止electionTimer、开启vouteTimer
  • 对其它主机发起RPC请求:rpcService.requestVote
  • 投自己一票

当收到投票数量过半时,开始becomeLeader流程

voteTimer、stepDownTimer

从上面的流程图还可以发现两个Timer:

一句话解释:

 voteTimer:是为了防止正式投票流程超时的一个计时器,在指定的时间内没达成一致,候选者会退出投票流程,重新开始发起新一轮的预投票。

(为什么preVote过程不需要一个timer?因为preVote大家都没有修改自己的term值,都有机会发起preVote,整个过程不会因为少量主机的故障而hang住)

stepDownTimer:防止集群过半主机挂掉以后,leader仍然在这种不健康的状态继续提供服务。

心跳流程

leader发送心跳请求,其实底层是复用的发送日志追加请求,只是日志内容是空的而已:

public static void sendHeartbeat(final ThreadId id, final RpcResponseClosure closure) {
    final Replicator r = (Replicator) id.lock();
    if (r == null) {
        RpcUtils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Peer %s is not connected", id));
        return;
    }
    //id unlock in send empty entries.
    r.sendEmptyEntries(true, closure);
}

注意SOFA_JRaft里面的RPC调用,都是采取的回调方式,所以可以看到心跳请求发送的时候,有注册一个回调函数:

RpcResponseClosure heartbeatDone;
// Prefer passed-in closure.
if (heartBeatClosure != null) {
    heartbeatDone = heartBeatClosure;
} else {
    heartbeatDone = new RpcResponseClosureAdapter() {

        @Override
        public void run(final Status status) {
            onHeartbeatReturned(Replicator.this.id, status, request, getResponse(), monotonicSendTimeMs);
        }
    };
}
this.heartbeatInFly = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request,
    this.options.getElectionTimeoutMs() / 2, heartbeatDone);

在回调函数onHeartbeatReturned里面,发起新一轮的心跳计时:

r.startHeartbeatTimer(startTimeMs);

注意:可以看到这里的心跳计时器不是前面那种RepeatedTimer,能反复自动执行,而是一个延迟执行的计时器,开启后只执行一次,然后在收到对方的成功响应后,再开启下一轮执行。

作为follower,在收到日志追加请求以后,在handleAppendEntriesRequest里面,对本地的lastLeaderTimestamp字段更新为当前最新时间,这样electionTimer就不会处理超时逻辑。

updateLastLeaderTimestamp(Utils.monotonicMs());
private void updateLastLeaderTimestamp(final long lastLeaderTimestamp) {
    this.lastLeaderTimestamp = lastLeaderTimestamp;
}

其中,electionTimer使用该时间参数的地方为:

private boolean isCurrentLeaderValid() {
    return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs();
}

可以看到,只要lastLeaderTimestamp是最新的,即被leader刚刚更新过,则认为leader是活着的,则electionTimer不会触发自荐选举流程,等待下一轮的计时到期。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5683547.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存