Leader负责进行投票的发起和决议,更新系统状态;
处理客户端的读写请求(为了保证最终一致性,所有写请求都由Leader处理);
Follwer用于接收客户端的请求并向客户端返回结果;
Follwer在选主的过程中参与投票。
Observer接收客户端的请求,并将写请求转发给Leader;
Observer不参加投票过程,只同步Leader的状态;
Observer的目的是为了扩展系统,提高系统读取速度。
分布式系统中的节点通信存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。
Paxos 算法是莱斯利•兰伯特(英语:Leslie Lamport)于 1990 年提出的一种基于消息传递且
具有高度容错特性的一致性算法。
ZooKeeper 的选举算法有两种:一种是基于 Basic Paxos(Google Chubby 采用)实现的,另外
一种是基于 Fast Paxos(ZooKeeper 采用)算法实现的。系统默认的选举算法为 Fast Paxos。
Paxos算法的原理可查看下面的链接:
分布式一致性算法——Paxos原理与推导过程 - 简书
ZooKeeper 的核心是原子广播,这个机制保证了各个 Server 之间的同步。实现这个机制的协 议叫做 ZAB 协议(Zookeeper Atomic BrodCast)。
ZAB 协议有两种模式,它们分别是崩溃恢复模式(选主)和原子广播模式(同步)。
1)当服务启动或Leader挂了后,ZAB进入恢复模式。Leader被选举出来,且大部分server完成了和Leader的状态同步后结束恢复模式。状态同步保证了Leader和Follower具有相同的系统状态
2)退出恢复模式后进入原子广播模式。此时,所有的写请求都被转发给Leader,再有Leader将更新提议(proposal)广播给Follower
3、Zookeeper集群选主过程Zookeeper server 有三种工作状态:
1)Looking:当前Server不知道Leader是谁,处于观望状态,搜寻并进行选举
2)Leading:当前Server即为选举出来的Leader
3)Following:Leader已经被选举出来,当前Server与其同步
3.1 全新集群选主通过 serverId 选主。标准:过半选举
3.2 非全新集群选主假设集群中非 Observer 节点共有5个,分别是:
hadoop01、hadoop02、hadoop03、hadoop04、hadoop05
集群启动时节点启动顺序:
hadoop02、hadoop03、hadoop01、hadoop04、hadoop05
由于集群是新启动,没有历史数据,所以选举的依据只有serverId,规则:serverId小的会将票投给大的。假设集群的serverId就是机器名称的编号。
1、hadoop02启动。发起选举并投自己一票,此时集群中没有相应,hadoop02处于Looking
2、hadoop03启动,它与hadoop01通信,交换投票结果。由于hadoop02的serverId小于hadoop03,hadoop03胜出。但由于没有超过半数,两个节点都出与Looking状态
3、hadoop01启动,hadoop03获得三票超过半数,hadoop03成为Leader
4、hadoop04、hadoop05启动发现集群中存在Leader,进入Following状态。
对于已经运行过一段时间的集群,如果Leader挂掉了会面临重新选主的问题。由于Zookeeper保证的是数据的最终一致性,在某一时间点上多个节点的数据可能是不同步的,此时不能仅依靠serverId进行选主,需要加入version和逻辑时钟。
1、逻辑时钟:每个节点维护自己的逻辑时钟,每次投票后递增,即逻辑时钟=当前节点的投票次数。逻辑时钟越大,说明这次选举的进程越新,即每次选举拥有一个zxid,投票结果只取zxid最大的。
2、数据version:zxid全局唯一且顺序递增,代表着事件的提交顺序。zxid越大,代表数据版本越新
3、serverId:配置Zookeeper集群时myid中的值,每个机器一个
非全新集群选主依据:
4. 数据同步1、逻辑时钟小的选举结果被忽略,重新投票
2、同一逻辑时钟后,数据version大的胜出
3、数据version相同时,serverId大的胜出
根据这个规则选举出来的Leader一定拥有集群中最新的数据
非全新集群重新选主后会面临数据不一致问题,此时需要数据同步。
选举完成后,集群进入数据同步状态,此时集群不对外提供服务。
5、Zookeeper写数据流程1、Follower连接Leader,将最大的zxid发送给Leader
2、Leader根据根据zxid返回给Follower同步点
3、Follower进行数据同步并将同步完成消息发送给Leader
4、Leader将Follower状态修改为Updated,此时Follower可以对外提供服务
6、Zookeeper工作流程 6.1 Leader工作流程1、Follower接收写请求后将请求转发给Leader
2、Leader处理写请求,处理完成后发送数据更新命令
3、Follower处理更新命令,完成后向Leader发出相应
4、Leader接收到一半以上的节点更新完毕的相应,则任务更新完毕。未更新完毕的节点在此期间不会对外提供服务,直至更新完成
写请求成功标准:过半节点更新成功
6.2 Follower工作流程Leader主要有三个功能:
1、恢复数据
2、维持与Learner的心跳,接收Learner的请求并判断Learner的请求类型
Learner的消息类型:
PING:Learner的心跳消息
REQUEST:Folower发送的提议信息,包括读写请求
ACK:Follower对proposal的回复,过半则commit该请求
RevalIDATE:用于延长session的有效时间
3、根据不同消息类型,进行不同的处理
6.3 Observer工作流程Follower主要有四个功能:
1、向Leader发送消息(PING、REQUEST、ACKRevalIDATA)
2、接收Leader命令并处理
3、接收Client的请求,读请求直接处理,写请求转发给Leader
4、返回Client结果
二、Zookeeper应用案例 1、服务器上下线动态感知Observer不会参与投票,也不会被选举为Leader。其它功能与Follower相同
1.1 需求描述
分布式系统中有多个节点,可以动态上下线,任何一个节点都能实时感知其他节点的上下线
1.2 设计思路
1、设计服务器端存储服务器上下线信息,例如都写入到servers节点下
2、设计客户端监听servers节点,获取该集群在线节点列表
3、节点上线时,在Zookeeper的servers目录下创建一个临时节点
4、服务器下线时临时节点自动删除,此时监听机制会告知客户端有节点变化
服务器端
public class Server { private static final String CONNECT_STRING = "hadoop01:2181"; private static final int SESSION_TIMEOUT = 3000; private static final String PARENT_PATH = "/servers"; static ZooKeeper zk = null; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { Server server = new Server(); server.getZkConnect(); server.registerServer("server1"); Thread.sleep(Long.MAX_VALUE); } // 拿到Zookeeper进群的链接 public void getZkConnect() throws IOException { zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } public void registerServer(String hostname) throws KeeperException, InterruptedException { Stat exists = zk.exists(PARENT_PATH, null); if (exists == null) { // 父目录创建永久节点,临时节点无法创建子节点 zk.create(PARENT_PATH, "parent_path".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 上线创建临时节点,下线自动删除 zk.create(PARENT_PATH + "/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is working..."); } }
客户端
public class Client { private static final String CONNECT_STRING = "hadoop01:2181"; private static final int SESSION_TIMEOUT = 3000; private static final String PARENT_PATH = "/servers"; static ZooKeeper zk = null; private static Listnodes = new ArrayList (); public static void main(String[] args) throws IOException, InterruptedException { Client client = new Client(); client.getZkConnect(); Thread.sleep(Long.MAX_VALUE); } public void getZkConnect() throws IOException { zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() { @SneakyThrows @Override public void process(WatchedEvent watchedEvent) { // 获取所有在线节点列表 List children = zk.getChildren(PARENT_PATH, true); // nodes存储在线节点,当监听触发时对比节点列表找到问题节点 if (children.size() > nodes.size()) { for (String child : children) { if (!nodes.contains(child)) { nodes.add(child); System.out.println(child + " is online"); } } } else { for (String node : nodes) { if (!children.contains(node)) { nodes.remove(node); System.out.println(node + " is dying"); } } } } }); System.out.println("client is working..."); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)