- 前言
- 1、节点角色
- 2、选举过程
- 2.1 胜出的条件
- 2.2 比较的规则
- 3、代码逻辑综述
- 4、源码分析
- 结语
Zookeeper作为Dubbo生态的默认注册中心,得到了非常的普遍的应用,虽然后来阿里又出了nacos,但是不可否认的是ZK仍然是一款非常优秀的开源产品,非常优秀的注册中心备选方案。
ZK有很多特性,本篇文章主要介绍ZK的选主过程(后宫佳丽三千,我就独宠你一人)
1、节点角色要说选主的过程,我们首先得了解ZK到底有哪些节点,这些节点充当得角色是什么?
ZK本身得节点主要分为三类:
Leader:主要是负责数据的写入,如果超过半数同意,那么就广播进行写入;
Follower:主要负责查询请求并将写入请求发送给leader,参与选举和写入投票;
Observe:也是负责查询请求并将写入请求发送给leader,不参加投票,只被动接收结果
获取半数投票以上的节点成为leader节点。
2.2 比较的规则万事万物都有一个准则,好的比较坏的,坏的比较更坏的,世上本没有痛苦,痛苦都是自己寻找的结果,海燕你可长点心吧,哎呀跑偏了。
ZK比较的时候有三个指标或者三个维度:
(1)任期
(2)事务ID(ZK中的事务ID)
(3)节点编号(集群中每个节点的编号)
根据以上三个指标就可以说出最终的结论了:选择任期大的,任期一样选择事务ID大的,前两个都一样,选择节点编号大的。
就这么简单?是的。规则就是这么简单,但是源码还是有那么一丢丢的绕。
源码看着相对比较枯燥,但是作为一个手艺人,怎么能不去了解怎么做的呢,我们先来梳理一下代码的流程,方便更好的看第四部分内容。
- 节点先投自己一票,并进行广播
- 节点内部循环进行消息接收
- 收到消息后
如果消息为空,就进行重新发送消息或者建立连接
如果消息不为空,且消息接收者和投票的leader都是合法节点就进行下边步骤。 - 如果节点为looking节点
根据当前节点的投票和接收到的投票进行比较来决定是否需要再次发送投票并且记录投票的结果 - 每次都判断记录的票数,如果过半就进行节点状态的设置
选主的逻辑是在lookForLeader开始的,像金字塔的第一块砖一样,我们先看ZK选主的第一块砖lookForLeader,第一次看源码得时候一定要把握主线,忽略从线,等主线完全理清楚了之后才去处理从线,要不会陷入迷宫之中。
下边就是主要的投票代码,看里边的注释:
public Vote lookForLeader() throws InterruptedException { //省略代码 synchronized (this) { //1、投票轮数加一 logicalclock.incrementAndGet(); //2、更新提议或者叫投票,三个参数:当前节点的id、ZK的最大事务id、任期 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getId(),Long.toHexString(proposedZxid)); //发送通知 sendNotifications(); //省略后边代码 }
更新投票或者投票的方法为:
synchronized void updateProposal(long leader, long zxid, long epoch) { proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch; }
发送通知的方法为:
private void sendNotifications() { for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); //初始化待发送通知或者投票的报文、对象 ToSend notmsg = new ToSend( ToSend.mType.notification, proposedLeader, //投的leader proposedZxid, //事务ID logicalclock.get(), //投票或者选举的轮数 QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, //任期 qv.toString().getBytes(UTF_8)); sendqueue.offer(notmsg); } }
待到山花烂漫时,她在丛中笑,消息都已经发完了,肯定就到了接收到选票的时候应该怎么 *** 作了,接收选票的代码也是在lookForLeader中:
//在服务没有停止并且当前节点的状态是LOOKING时进行while循环 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { //200毫秒内有数据就立马取出 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //当取出的数据为空的时候 if (n == null) { //判断是否所有的投票都已经发送,如果发送完毕就重新发送 if (manager.haveDelivered()) { sendNotifications(); //如果没有发送完毕就建立连接 } else { manager.connectAll(); } notTimeout = Math.min(notTimeout << 1, maxNotificationInterval); //self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval) //上面一行代码默认返回false,所以下边的if判断永远不可能进入,当没有收到选票时就等待200ms后继续循环,即空转一轮。 if (self.getQuorumVerifier() instanceof QuorumOracleMaj && self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } LOG.info("Notification time out: {} ms", notTimeout); //当接收到的消息不为空,并且是接收者和待选leader都是有效节点时进入当前逻辑 } else if (validVoter(n.sid) && validVoter(n.leader)) { //先查看投票节点的状态 switch (n.state) { case LOOKING: case OBSERVING: case FOLLOWING: case LEADING: default: LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } }
接上代码继续讨论,校验发送投票节点的状态,我们从本文的第一章节知道Observe节点是不参与投票的,只是转发写请求和被动接收数据,负责查询请求,所以从代码中我们也可以看出来:
case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break;
当发送投票的节点状态是FOLLOWING和LEADING时,代表发送节点已经选举完成,所以处理方法的逻辑都是一样滴,这部分限于篇幅太长,暂时就不深入讨论了,感兴趣的朋友可以私信我或者加我微信号M_P_E_D进行交流和沟通,请备注CSDN
case FOLLOWING: Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n); if (resultFN == null) { break; } else { return resultFN; } case LEADING: Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n); if (resultLN == null) { break; } else { return resultLN; }
终于到重头戏了,咱们看看LOOKING状态时的代码:
我们先把totalOrderPredicate方法放前边,这个其实就是选举leader的规则的实现。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
case LOOKING: //当前节点事务id为-1或者发送节点的事务id为-1时,忽略通知 if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1"); break; } if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); break; } //如果发送节点的轮数大于当前节点的轮数 ,证明当前投票落后了 if (n.electionEpoch > logicalclock.get()) { //就把当前节点的投票轮数设置成和发送节点轮数一样 logicalclock.set(n.electionEpoch); //并且清空收到的选票,落后之后选票就无效了 recvset.clear(); if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); //如果当前节点的选举轮数比发送的大,证明接收到的选票无效 } else if (n.electionEpoch < logicalclock.get()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", Long.toHexString(n.electionEpoch), Long.toHexString(logicalclock.get())); break; //如果选举轮数一样,就比较发送节点的选票和当前节点的选票 //按照规则发送节点的选择的leader胜出,就修改当前节点的选票为 //发送节点一样,然后发送通知 } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } //该发的通知发送了,该比较的都比较清楚了, //但是注意上边的代码,只有在选票不一致的时候才再次 //进行投票,如果一样的话就不再进行投票了。 //下边是把投票信息记录下来,为什么?因为要比较HALF,过半就选出 //leader了,别一直忙活呀 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //该方法的作用是选的leader的一样的节点加在一样,为了比较是否过半 voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); //该方法就是比较选择同样leader的节点是否过半,如果过半就进入 //进行状态设置 if (voteSet.hasAllQuorums()) { //这块并没有立马设置,而是又等了一轮(200ms),然后在这个200ms //内又接收到新的投票,并且新投票的选择的leader节点胜出,那么就 //把这张投票放进队列中,进行下次循环,并跳出当前的计票或者选主流程 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } //如果200ms后没有收到新的投票,那么就进行节点状态的设置, //proposedLeader与当前节点id一样,当前节点就是leader,否则依据 //是否为PARTICIPANT来判读为fllow或者observe节点 if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break;结语
道阻且长,行则将至,行而不辍,未来可期,加油。
原创不易,肝了3夜才写完,如果你觉得文章不错,对你的进步有那么一点帮助,那么就给个小心心,如果觉得文章非常对你的胃口,那么欢迎你关注我,或者关注个人的微信公众号,咱们一起打怪升级。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)