1. cap与base原则详解
2. nacos & zk & eureka的cap架构横向对比
3. raft协议动态图解
4. nacos集群cp架构基于raft协议源码剖析
5. nacos集群cp架构的脑裂问题
1. cap架构
cap并不能同时满足, 通常只能满住ap或者cp
nacos通过改变服务的持久化方法, 即实现了cp(ephemeral = true), 也实现了ap(ephemeral), 默认是ap
3. raft协议动态图解
Rafthttp://thesecretlivesofdata.com/raft/raft中的三种角色 : leader, candidate,follower
多个节点之间, 通过投票的方式来选取leader
在项目启动时, 多个节点都是处于休眠状态, 最先苏醒的节点先发起投票, 此时其他节点还没有苏醒, 会将票都投给该节点, 该节点就会变成leader, 如果有多个节点同时苏醒, 那么他们的获得的投票数会相同, 则所有节点重新进入休眠, 当选举出一个leader是, leader会向所有的follower发送心跳证明自己还活着,同时发送心跳时还会发送leader中的数据, 所有的follower根据leader发送过来的数据进行对比, 判断自己的数据是否都拥有, 如果多就删除, 如果少就去leader拉数据, 如果leader挂了之后, 剩余的follower会重新投票, 选取新的leader, leader选取成功后, 即使之前的leader恢复过来,那么他也是一个follower
选取流程 :
这样就选出了一个leader, 所有的写 *** 作都是通过leader执行的, leader中写好之后, 会将数据同步到其他的follower中, 如果follower中超过半数数据同步成功, 则认为数据写入成功, 将写入成功的信息返回给客户端 接下来看一下图解的这个过程
通过两次提交的方式, 来保证数据的一致性 , 第一次将数据发送到follower节点, 节点返回一个ack
半数以上的节点返回了ack, 则进行第二次提交
raft就此结束, 接下来看一下nacos实现的源码
nacos源码实现 :
到此为止, 我们已经讲解了nacos如果通过leader进行写数据, 将数据同步给follower,
并且讲解了如何保证一半以上节点写入成功
接下来讲解一下nacos如果选取leader的源码 :
查看RaftCore类的init()方法
进入MasterElection()run方法
到此为止, leader选举完成
接下来看一下心跳是如何发送的, 进入 new HeartBeat()run方法
到这里发送心跳完成
接下来看一下follower节点如何接受心跳 : 进入raftController类的beat()方法
public RaftPeer receivedBeat(JsonNode beat) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } final RaftPeer local = peers.local(); final RaftPeer remote = new RaftPeer(); JsonNode peer = beat.get("peer"); remote.ip = peer.get("ip").asText(); remote.state = RaftPeer.State.valueOf(peer.get("state").asText()); remote.term.set(peer.get("term").asLong()); remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong(); remote.leaderDueMs = peer.get("leaderDueMs").asLong(); remote.voteFor = peer.get("voteFor").asText(); //判断发送心跳的节点是不是leader,不是则抛异常 if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JacksonUtils.toJson(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } if (local.term.get() > remote.term.get()) { Loggers.RAFT .info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}", remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs); throw new IllegalArgumentException( "out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get()); } if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote)); // mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } //获取leader节点的所有key final JsonNode beatDatums = beat.get("datums"); local.resetLeaderDue(); local.resetHeartbeatDue(); peers.makeLeader(remote); if (!switchDomain.isSendBeatonly()) { MapreceivedKeysMap = new HashMap<>(datums.size()); //获取本节点拥有的key的集合,将所有的key放入到map中 //key : key集合, value : 魔法值0 代表这个key是本节点中有的, 接下来这个value值会在删除key时起作用 for (Map.Entry entry : datums.entrySet()) { receivedKeysMap.put(entry.getKey(), 0); } // now check datums List batch = new ArrayList<>(); int processedCount = 0; //遍历leader的所有key for (Object object : beatDatums) { processedCount = processedCount + 1; JsonNode entry = (JsonNode) object; String key = entry.get("key").asText(); final String datumKey; if (KeyBuilder.matchServicemetaKey(key)) { datumKey = KeyBuilder.detailServicemetaKey(key); } else if (KeyBuilder.matchInstanceListKey(key)) { datumKey = KeyBuilder.detailInstanceListkey(key); } else { // ignore corrupted key: continue; } long timestamp = entry.get("timestamp").asLong(); //如果leader和本节点都有的值, 对应的value为1 receivedKeysMap.put(datumKey, 1); try { if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } //batch中的数据就是leader节点包含,本节点不包含的key, 获取本届点中也包含但是时间戳小于leader节点的 //batch中的key就是本节点需要到leader中拉取的key if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { batch.add(datumKey); } //如果比leader少一个key并不会试时去拉取, 到达一定数量时才回去拉取 if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } //拼接所有缺失的key String keys = StringUtils.join(batch, ","); if (batch.size() <= 0) { continue; } Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}" + ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size()); //拼接leader的url, 去leader拉取数据 String url = buildUrl(remote.ip, API_GET); Map queryParam = new HashMap<>(1); //将上面的keys进行封装, 封装到queryParam中, queryParam作为请求的参数 queryParam.put("keys", URLEncoder.encode(keys, "UTF-8")); HttpClient.asyncHttpGet(url, null, queryParam, new Callback () { @Override public void onReceive(RestResult result) { if (!result.ok()) { return; } List datumList = JacksonUtils .toObj(result.getData(), new TypeReference >() { }); //本节点补充缺失的数据 for (JsonNode datumJson : datumList) { Datum newDatum = null; OPERATE_LOCK.lock(); try { Datum oldDatum = getDatum(datumJson.get("key").asText()); if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp .get()) { Loggers.RAFT .info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}", datumJson.get("key").asText(), datumJson.get("timestamp").asLong(), oldDatum.timestamp); continue; } if (KeyBuilder.matchServicemetaKey(datumJson.get("key").asText())) { Datum
serviceDatum = new Datum<>(); serviceDatum.key = datumJson.get("key").asText(); serviceDatum.timestamp.set(datumJson.get("timestamp").asLong()); serviceDatum.value = JacksonUtils .toObj(datumJson.get("value").toString(), Service.class); newDatum = serviceDatum; } if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) { Datum instancesDatum = new Datum<>(); instancesDatum.key = datumJson.get("key").asText(); instancesDatum.timestamp.set(datumJson.get("timestamp").asLong()); instancesDatum.value = JacksonUtils .toObj(datumJson.get("value").toString(), Instances.class); newDatum = instancesDatum; } if (newDatum == null || newDatum.value == null) { Loggers.RAFT.error("receive null datum: {}", datumJson); continue; } raftStore.write(newDatum); datums.put(newDatum.key, newDatum); notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value); local.resetLeaderDue(); if (local.term.get() + 100 > remote.term.get()) { getLeader().term.set(remote.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(100); } raftStore.updateTerm(local.term.get()); Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term); } catch (Throwable e) { Loggers.RAFT .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e); } finally { OPERATE_LOCK.unlock(); } } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e); } return; } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable); } @Override public void onCancel() { } }); //清空batch batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } List deadKeys = new ArrayList<>(); //如果value还有为0的, 则说明leader中没有这个key,而本节点中有这个key, 可能是leader中删除了, 则需要在本节点删除这个key for (Map.Entry entry : receivedKeysMap.entrySet()) { if (entry.getValue() == 0) { deadKeys.add(entry.getKey()); } } for (String deadKey : deadKeys) { try { deleteDatum(deadKey); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e); } } } return local; }
源码讲解完毕
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)