关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色
- Node 代表的就是一个服务节点
- Ballot 代表的是一次投票的相关信息
- PeerId 代表的是一个复制组里面的一个参与角色
- StateMachine 当数据提交到Node之后,会执行其onApply方法
- electionTimer 选举定时器,如果当前leader挂了,会进行preVote
- voteTimer 投票定时器,当投票超时后,会进行preVote
- stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) { protected void onTrigger() { handleElectionTimeout(); } protected int adjustTimeout(final int timeoutMs) { return randomTimeout(timeoutMs); } }; private void handleElectionTimeout() { boolean doUnlock = true; this.writeLock.lock(); try { if (this.state != State.STATE_FOLLOWER) { return; } if (isCurrentLeaderValid()) { return; } resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", this.leaderId)); // Judge whether to launch a election. if (!allowLaunchElection()) { return; } doUnlock = false; preVote(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }
而handleElectionTimeout中主要就是进行了preVote *** 作,这里JRaft一次投票的主要几个 *** 作如下:
preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest
private void preVote() { ..... final LogId lastLogId = this.logManager.getLastLogId(true); boolean doUnlock = true; this.writeLock.lock(); try { // pre_vote need defense ABA after unlock&writeLock if (oldTerm != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); return; } this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); for (final PeerId peer : this.conf.listPeers()) { if (peer.equals(this.serverId)) { continue; } if (!this.rpcService.connect(peer.getEndpoint())) { LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); continue; } final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm); done.request = RequestVoteRequest.newBuilder() // .setPreVote(true) // it's a pre-vote request. .setGroupId(this.groupId) // .setServerId(this.serverId.toString()) // .setPeerId(peer.toString()) // .setTerm(this.currTerm + 1) // next term .setLastLogIndex(lastLogId.getIndex()) // .setLastLogTerm(lastLogId.getTerm()) // .build(); this.rpcService.preVote(peer.getEndpoint(), done.request, done); } this.prevVoteCtx.grant(this.serverId); if (this.prevVoteCtx.isGranted()) { doUnlock = false; electSelf(); } } finally { if (doUnlock) { this.writeLock.unlock(); } } }
RequestVoteRequest.newBuilder() // .setPreVote(true) // it's a pre-vote request. .setGroupId(this.groupId) // .setServerId(this.serverId.toString()) // .setPeerId(peer.toString()) // .setTerm(this.currTerm + 1) // next term .setLastLogIndex(lastLogId.getIndex()) // .setLastLogTerm(lastLogId.getTerm()) // .build();
可以看到,这时候并没有将自己的currTerm设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++
public Message handlePreVoteRequest(final RequestVoteRequest request) { boolean doUnlock = true; this.writeLock.lock(); try { if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } final PeerId candidateId = new PeerId(); if (!candidateId.parse(request.getServerId())) { LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); } boolean granted = false; // noinspection ConstantConditions do { if (!this.conf.contains(candidateId)) { LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(), request.getServerId(), this.conf); break; } if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) { LOG.info( "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId); break; } if (request.getTerm() < this.currTerm) { LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); // A follower replicator may not be started when this node become leader, so we must check it. checkReplicator(candidateId); break; } // A follower replicator may not be started when this node become leader, so we must check it. // check replicator state checkReplicator(candidateId); doUnlock = false; this.writeLock.unlock(); final LogId lastLogId = this.logManager.getLastLogId(true); doUnlock = true; this.writeLock.lock(); final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm()); granted = requestLastLogId.compareTo(lastLogId) >= 0; LOG.info( "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId, lastLogId); } while (false); return RequestVoteResponse.newBuilder() // .setTerm(this.currTerm) // .setGranted(granted) // .build(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }
- 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
- 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
- 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
- 如果上面都不满足,返回granted=true
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) { boolean doUnlock = true; this.writeLock.lock(); try { if (this.state != State.STATE_FOLLOWER) { LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.", getNodeId(), peerId, this.state); return; } if (term != this.currTerm) { LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(), peerId, term, this.currTerm); return; } if (response.getTerm() > this.currTerm) { LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId, response.getTerm(), this.currTerm); stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term pre_vote_response.")); return; } LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId, response.getTerm(), response.getGranted()); // check granted quorum? if (response.getGranted()) { this.prevVoteCtx.grant(peerId); if (this.prevVoteCtx.isGranted()) { doUnlock = false; electSelf(); } } } finally { if (doUnlock) { this.writeLock.unlock(); } } }
- 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
- 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
- 如果响应允许这次投票,即response.getGranted=true,判断本轮发起的投票同意是否过半。
public boolean init(final Configuration conf, final Configuration oldConf) { this.peers.clear(); this.oldPeers.clear(); this.quorum = this.oldQuorum = 0; int index = 0; if (conf != null) { for (final PeerId peer : conf) { this.peers.add(new UnfoundPeerId(peer, index++, false)); } } this.quorum = this.peers.size() / 2 + 1; if (oldConf == null) { return true; } index = 0; for (final PeerId peer : oldConf) { this.oldPeers.add(new UnfoundPeerId(peer, index++, false)); } this.oldQuorum = this.oldPeers.size() / 2 + 1; return true; }
可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
public void grant(final PeerId peerId) { grant(peerId, new PosHint()); } public PosHint grant(final PeerId peerId, final PosHint hint) { UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0); if (peer != null) { if (!peer.found) { peer.found = true; this.quorum--; } hint.pos0 = peer.index; } else { hint.pos0 = -1; } if (this.oldPeers.isEmpty()) { hint.pos1 = -1; return hint; } peer = findPeer(peerId, this.oldPeers, hint.pos1); if (peer != null) { if (!peer.found) { peer.found = true; this.oldQuorum--; } hint.pos1 = peer.index; } else { hint.pos1 = -1; } return hint; }
public boolean isGranted() { return this.quorum <= 0 && this.oldQuorum <= 0; }
private void electSelf() { long oldTerm; try { LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm); if (!this.conf.contains(this.serverId)) { LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf); return; } if (this.state == State.STATE_FOLLOWER) { LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm); this.electionTimer.stop(); } resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL as it begins to request_vote.")); this.state = State.STATE_CANDIDATE; this.currTerm++; this.votedId = this.serverId.copy(); LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm); this.voteTimer.start(); this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); oldTerm = this.currTerm; } finally { this.writeLock.unlock(); } final LogId lastLogId = this.logManager.getLastLogId(true); this.writeLock.lock(); try { // vote need defense ABA after unlock&writeLock if (oldTerm != this.currTerm) { LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm); return; } for (final PeerId peer : this.conf.listPeers()) { if (peer.equals(this.serverId)) { continue; } if (!this.rpcService.connect(peer.getEndpoint())) { LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); continue; } final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this); done.request = RequestVoteRequest.newBuilder() // .setPreVote(false) // It's not a pre-vote request. .setGroupId(this.groupId) // .setServerId(this.serverId.toString()) // .setPeerId(peer.toString()) // .setTerm(this.currTerm) // .setLastLogIndex(lastLogId.getIndex()) // .setLastLogTerm(lastLogId.getTerm()) // .build(); this.rpcService.requestVote(peer.getEndpoint(), done.request, done); } this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId); this.voteCtx.grant(this.serverId); if (this.voteCtx.isGranted()) { becomeLeader(); } } finally { this.writeLock.unlock(); } }
正式投票会进行如下 *** 作:
- 当前currTerm++,voteTimer启动,voteCtx初始化
- 发送RequestVoteRequest请求,与preVote基本差不多,唯一区别PreVote=false
public Message handleRequestVoteRequest(final RequestVoteRequest request) { boolean doUnlock = true; this.writeLock.lock(); try { if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } final PeerId candidateId = new PeerId(); if (!candidateId.parse(request.getServerId())) { LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); } // noinspection ConstantConditions do { // check term if (request.getTerm() >= this.currTerm) { LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); // increase current term, change state to follower if (request.getTerm() > this.currTerm) { stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term RequestVoteRequest.")); } } else { // ignore older term LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); break; } doUnlock = false; this.writeLock.unlock(); final LogId lastLogId = this.logManager.getLastLogId(true); doUnlock = true; this.writeLock.lock(); // vote need ABA check after unlock&writeLock if (request.getTerm() != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); break; } final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()) .compareTo(lastLogId) >= 0; if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) { stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE, "Raft node votes for some candidate, step down to restart election_timer.")); this.votedId = candidateId.copy(); this.metaStorage.setVotedFor(candidateId); } } while (false); return RequestVoteResponse.newBuilder() // .setTerm(this.currTerm) // .setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) // .build(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }
- 判断请求的term和当前term大小,比请求的term大,返回Granted=false
- 判读当前log的位置是否比请求位置小,如果小证明发起请求节点数据位置比当前节点新,如果当前节点没有投票给其他节点,那么设置当前节点term为请求节点的term,同时将当前节点的投票votedId设置为请求节点,表名当前节点将选票投给了请求节点,当前节点会执行stepDown *** 作,不会进行选举,节点变为STATE_FOLLOWER,同时开启electionTimer定时器
- 返回判断当前节点term是否等于请求term且当前节点的选票ID和请求节点是否一致,如果满足上面两个条件,表明当前节点将票投给了请求节点
public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) { this.writeLock.lock(); try { if (this.state != State.STATE_CANDIDATE) { LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.", getNodeId(), peerId, this.state); return; } // check stale term if (term != this.currTerm) { LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(), peerId, term, this.currTerm); return; } // check response term if (response.getTerm() > this.currTerm) { LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(), peerId, response.getTerm(), this.currTerm); stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term request_vote_response.")); return; } // check granted quorum? if (response.getGranted()) { this.voteCtx.grant(peerId); if (this.voteCtx.isGranted()) { becomeLeader(); } } } finally { this.writeLock.unlock(); } }
- 判断响应的reponse的term和当前节点发起投票前的term是否一致,如果不一致,直接返回
- 判断响应的reposen的term是否比当前节点发起投票前的term大,如果满足,直接返回
- 判断当前节点已经获取的选票是否过半,如果过半,将当前节点晋升为Leader节点,执行becomeLeader逻辑
private void becomeLeader() { Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state); LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm, this.conf.getConf(), this.conf.getOldConf()); // cancel candidate vote timer stopVoteTimer(); this.state = State.STATE_LEADER; this.leaderId = this.serverId.copy(); this.replicatorGroup.resetTerm(this.currTerm); // Start follower's replicators for (final PeerId peer : this.conf.listPeers()) { if (peer.equals(this.serverId)) { continue; } LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer); if (!this.replicatorGroup.addReplicator(peer)) { LOG.error("Fail to add a replicator, peer={}.", peer); } } // Start learner's replicators for (final PeerId peer : this.conf.listLearners()) { LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer); if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) { LOG.error("Fail to add a learner replicator, peer={}.", peer); } } // init commit manager this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1); // Register _conf_ctx to reject configuration changing before the first log // is committed. if (this.confCtx.isBusy()) { throw new IllegalStateException(); } this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf()); this.stepDownTimer.start(); }
- 停止当前投票定时器
- 将所有非当前节点、角色为Follower的节点加入到复制组里面去
- 将所有角色为Learner的节点加入到复制组里面去
- 重置ballotBox(用来管理选票的)
- stepDownTimer启动
数据写入和复制客户端通过RouteTable.getInstance().selectLeader(groupId)能够获取当前分组下的Leader节点信息,拼接待写入数据的Request对象,然后通过CliClientServiceImpl..getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {}能够向集群Leader节点进行数据写入。
public class Task implements Serializable { private static final long serialVersionUID = 2971309899898274575L; private ByteBuffer data = LogEntry.EMPTY_DATA; private Closure done; private long expectedTerm = -1; }
这里的data就是我们写入的数据,而Closure done则是一个回调接口,当数据被写入到集群1/2+1节点成功之后会调用Closure.run(final Status status)方法。
public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch) throws Exception { if (event.shutdownLatch != null) { if (!this.tasks.isEmpty()) { executeApplyingTasks(this.tasks); reset(); } final int num = GLOBAL_NUM_NODES.decrementAndGet(); event.shutdownLatch.countDown(); return; } this.tasks.add(event); if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) { executeApplyingTasks(this.tasks); reset(); } }
- 首先判断当前节点是不是Leader节点,如果不是的话,Closure.run(错误状态)返回
- task的term和当前节点term不一致,同上,返回
- 调用BallotBoxappendPendingTask,这个逻辑需要注意下:
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) { // 每个写入Task都生成一个Ballot ,并放入pendingmetaQueue,后续其他 final Ballot bl = new Ballot(); if (!bl.init(conf, oldConf)) { LOG.error("Fail to init ballot."); return false; } final long stamp = this.stampedLock.writeLock(); try { if (this.pendingIndex <= 0) { LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex); return false; } this.pendingmetaQueue.add(bl); this.closureQueue.appendPendingClosure(done); return true; } finally { this.stampedLock.unlockWrite(stamp); } }
这里为每个写入任务都生成了一个Ballot,还记得上面选主投票的时候,用的也是这个来标记选举是否过半,这里也是一样,后续其他节点复制Leader该Task的数据的时候,会对应更新Leader中对应该Task的Ballot 投票信息,通过该Ballot 能够判断集群是否有过半节点已经完成了该Task的写入。同时也将Task写入集群过半节点成功之后的回调入口Closure 保存在closureQueue中,当其他节点写入Task成功更新对应Task的Ballot 的时候,会判断是否过半节点写入成功,如果成功则会回调对应Task的Closure的run方法。
4. 调用logManager.appendEntries将数据写入
5. 本地节点写入完成之后,回调LeaderStableClosure接口,逻辑为:
public void run(final Status status) { if (status.isOk()) { NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, NodeImpl.this.serverId); } else { this.firstLogIndex + this.nEntries - 1, status); } }
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) { final long stamp = this.stampedLock.writeLock(); long lastCommittedIndex = 0; try { if (this.pendingIndex == 0) { return false; } if (lastLogIndex < this.pendingIndex) { return true; } final long startAt = Math.max(this.pendingIndex, firstLogIndex); Ballot.PosHint hint = new Ballot.PosHint(); for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) { final Ballot bl = this.pendingmetaQueue.get((int) (logIndex - this.pendingIndex)); hint = bl.grant(peer, hint); if (bl.isGranted()) { lastCommittedIndex = logIndex; } } if (lastCommittedIndex == 0) { return true; } this.pendingmetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1); this.pendingIndex = lastCommittedIndex + 1; this.lastCommittedIndex = lastCommittedIndex; } finally { this.stampedLock.unlockWrite(stamp); } this.waiter.onCommitted(lastCommittedIndex); return true; }
6. 在节点成为Leader的时候,会初始化日志复制组:this.replicatorGroup.addReplicator(peer),对于集群中的每个除当前节点的节点都会启动一个Replicator进行复制同时会开启心跳超时定时器,开始的时候首先会发送一个空的EmptyEntries给到Follower,获取Follower节点的最新日志位置,获取到Follower节点的最新日志位置之后,会再次发送需要同步的日志
7. 在Replicator中对Follower的响应进行处理onAppendEntriesReturned,如果Follower写入成功,会调用Node.BallotBox().commitAt 这里和步骤5处理一样