Zookeeper 视频学习

Zookeeper 视频学习,第1张

Zookeeper 视频学习 1、ZooKeeper 入门 1.1、概述

Zookeeper 是一个开源的分布式的、为分布式框架提供协调服务的 Apache 项目。

Zookeeper 工作机制

Zookeeper 从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接收观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper上注册的那些观察者做出相应的反应。

如果不了解观察者模式的伙伴可以取看看这篇博客:JAVA设计模式 ———— 观察者模式

2、Zookeeper 的特点

思考:为什么 Zookeeper 服务器只能由一个 Leader?

首先,绝大多数业务,读比写的 *** 作多,而且相差的不是一星半点而是几个数量级。

就是新增、修改、删除的次数要远远比查询的次数少,所以负责写的节点的压力一定是比负责读的节点的压力小。

还有就是,Zookeeper 其实并不是 Leader 负责写,而是由 Leader 负责通知其他 Follower 写。每个 Follower 也是可以接收写请求的,只不过自己先不执行写,而是要转发给 Leader,等 Leader 通知它写完了,才开始写。这一点跟其他一些读写分离方案不太一样,比如 Mysql 读写分离的只读节点压根就不接受写请求。

思考:为什么 Zokeeper 服务器适合安装奇数台服务器?

首先要明确一点,任意台 Zookeeper 服务器都能够部署且正常运转。Zookeeper 官方建议部署奇数台服务器,其实是基于节约资源的角度来考虑的。

Zookeeper 默认采用 Quoryms 组件来处理集群的脑裂问题。Quoryms 的原则就算过半存活即可用。

假如我们需要搭建一个最大允许 N 台服务器宕机的 Zookeeper 集群,那么我们最少需要部署 2N+1 台服务器。也就是说,对于一个有 2N+1 台服务器的 Zookeeper 集群来说,最大能接受 N 台服务器宕机。那么,我部署 2N+2 台服务器也可以满足你的要求。是的,2N+1 只是一个最优解。Zookeeper 官方建议部署奇数台服务器,就是从资源利用率的角度来说的。毕竟,服务器资源的成本是比较大的,能少部署一台,就能节约一台的成本。

1.3、数据结构

Zookeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称作一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

1.4、应用场景

提供的服务包括:统一命名服务、同于配置管理、统一集群管理、服务器节点动态上下线、软负载均衡。

在分布式环境下,经常需要对应用 / 服务进行统一命名,便于识别。

1.4.1、统一命名服务

例如:IP 不容易记住,而域名容易记住。

1.4.2、统一配置管理

分布式环境下,配置文件同步非常常见。

  • 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群和 Hadoop 集群。
  • 对配置文件修改后,希望能够快速同步到各个节点上。
  • 配置管理可以交由 Zookeeper 实现
    • 可将配置信息写入 Zookeeper 上的一个 ZNode 节点中。
    • 各个客户端监听这个 ZNode。
    • 一旦 ZNode 中的数据被修改,Zookeeper 将通知各个客户端服务器。

1.4.3、统一集群管理
  • 分布式环境中,实时掌握每个节点的状态是必要的。
    • 可以根据节点实时状态做出一些调整。
  • Zookeeper 可以实现监控节点状态变化
    • 可将节点信息写入 Zookeeper 上的一个 ZNode。
    • 监听这个 ZNode 可以获取它的实时状态变化。

1.4.4、服务器动态上下线

1.4.5、软负载均衡

在 Zookerrper 中记录每台服务器的访问数,让访问数最少的服务器取处理最新的客户端请求。

实现思路:

  • 将 Zookeeper 作为服务的注册中心,所有服务在启动时向注册中心登录自己能够提供的服务。
  • 服务的调用者到注册中心获取能够提供所需要服务的服务器列表,然后自己根据负载均衡算法,从中选取一台服务器进行连接
  • 当服务器列表发生变化的时候,如:某台服务器宕机下线,或者新机器加入,Zookeeper 会自动通知调用者重新获取服务列表。

安装 Zookeeper

因为我是阿里云服务器,所以直接采用 docker 进行安装了,比较方便。

# 下载镜像
[root@iZ2ze0jazgwtuvi1lczdqfZ ~]# docker pull zookeeper:3.7
latest: Pulling from library/zookeeper
eff15d958d66: Pull complete 
66aa43e8673f: Pull complete 
089381f525cd: Pull complete 
c9594f4373c2: Pull complete 
b0fe3d8db030: Pull complete 
b2fdab4981df: Pull complete 
9e187a3301cc: Pull complete 
cb7e3b60859d: Pull complete 
Digest: sha256:9580eb3dfe20c116cbc3c39a7d9e347d2e34367002e2790af4fac31208e18ec5
Status: Downloaded newer image for zookeeper:3.7
docker.io/library/zookeeper:3.7

# 在主机上建立挂载目录和zookeeper配置文件
mkdir -p /home/data/zookeeper_data/conf 
mkdir -p /home/data/zookeeper_data/data 

# 启动容器
docker run -d --name zookeeper -p 2181:2181 --restart always zookeeper:3.7

# 端口讲解
1.2181:对client客户端提供服务
2.3888:选举leader使用
3.2888:集群内机器通讯使用(Leader监听此端口)

# 参数解读
# --restart always 重启Docker的时候容器也进行重启

# 查看当前zookeeper服务器的状态
docker inspect 容器id

# 进入容器
docker exec -it 容器id /bin/bash

# 客户端连接zookeeper
root@d69dc9b1ab13:/apache-zookeeper-3.7.0-bin/bin# ./zkCli.sh
Connecting to localhost:2181
2021-12-02 08:36:08,550 [myid:] - INFO  [main:Environment@98] - Client environment:zookeeper.version=3.7.0-e3704b390a6697bfdf4b0bef79e3da7a4f6bac4b, built on 2021-03-17 09:46 UTC
root@d69dc9b1ab13:/apache-zookeeper-3.7.0-bin/bin# ls /
[zookeeper]
root@d69dc9b1ab13:/apache-zookeeper-3.7.0-bin/bin# quit
配置文件 配置项含义说明tickTime = 2000通信心跳时间维持心跳的时间间隔,单位是毫秒initLimit = 10初始通信时限用于zookeeper集群,此时有多台zookeeper服务器,其中一个为Leader,其他都为FollowersyncLimit = 5同步通信时限在运行时Leader通过心跳检测与Follower进行通信,如果超过syncLimit * tickTime 时间还为收到响应,则认为该Follower已经宕机dataDir = …/data存储数据的目录数据文件也称为snapshot快照文件clientPort = 2181端口号默认为 2181maxClientCnxns = 60单个客户端的最大连接数限制默认为60,可以设置为0,表示没有限制autopurge.snapRetainCount = 3保留文件的数量默认为3个autopurge.purgeInterval = 1自动清理快照文件和事务日志的频率默认为0,表示不开启自动清理,单位是小时dataLogDir =存储日志的目录未指定时日志文件也存放在dataDir中,为了性能最大化,一般建议把dataDir和dataLogDir 分别放到不同的磁盘上 客户端常用命令 命令作用说明help查看帮助查看所有 *** 作命令ls 节点路径查看指定节点下的内容ls -s 节点路径查看指定节点的详细信息查看所有子节点和当前节点的状态create 节点路径 内容创建普通节点如果内容中有空格,则需要使用对双引号引起来get 节点路径获取节点中的值create -e 节点路径 内容创建临时节点当连接断开后,节点会被自动删除create -s 节点路径 内容创建顺序编号节点即带序号的节点delete 节点路径删除节点只能删除空节点,即不能有子节点rmr 节点路径递归删除节点递归删除stat 节点路径查看节点状态set 节点路径 新值修改节点内容

顺序编号节点:

  • 顺序编号会紧跟在节点名称后面,节点最终名称为:**节点名 + 序号 **,如/test0000000005
  • 顺序编号是一个递增的计数器
  • 顺序编号是由父节点维护,从已有子节点个数开始(包括临时节点和被删除的节点)
  • 如果子节点为空,则从 0000000000 开始,依次递增 1
  • 在分布式系统中,顺序编号可以被用于为所有的事件进行全局排序,这样客户端就可以根据序号推断事件的顺序
Zookeeper 集群

首先,启动三个 zookeeper 容器。这里涉及到一个问题,就是 docker 容器之间的通信问题。

Docker 有三种网络模式,bridge、host、none,在你创建容器的时候,不指定 --network 默认就是 bridge。

bridge:为每一个容器分配 IP,并将容器连接到一个 docker0 的虚拟网桥上,通过 docker0 网桥与宿主机通信。就是说,此模式下,你不能使用宿主机的 IP + 容器映射端口来进行 Docker 容器之间的通信。

host:容器不会虚拟自己的网卡,配置自己的 IP,而是使用宿主机的 IP 和端口,这样以来,Docker 容器之间的通信就可以用宿主机的 IP + 容器映射端口

none:无网络

# 现在本地创建目录
[root@iZ2ze0jazgwtuvi1lczdqfZ home]# mkdir zookeeper
[root@iZ2ze0jazgwtuvi1lczdqfZ home]# cd zookeeper/
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# mkdir /usr/local/zookeeper-cluster
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# mkdir /usr/local/zookeeper-cluster/node1
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# mkdir /usr/local/zookeeper-cluster/node2
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# mkdir /usr/local/zookeeper-cluster/node3

# 为了方便容器之间的通信,这里我们采用自定义网络
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# docker network create --driver bridge --subnet=192.18.0.0/16 --gateway=192.168.0.1 mynet

# docker 容器创建命令
docker run -d -p 2181:2181 --name zookeeper_node1 --privileged --restart always --network mynet --ip 192.168.0.2 -v /usr/local/zookeeper-cluster/node1/volumes/data:/data -v /usr/local/zookeeper-cluster/node1/volumes/datalog:/datalog -v /usr/local/zookeeper-cluster/node1/volumes/logs:/logs -e ZOO_MY_ID=1 -e "ZOO_SERVERS=server.1=192.168.0.2:2888:3888;2181 server.2=192.168.0.3:2888:3888;2181 server.3=192.168.0.4:2888:3888;2181" zookeeper:3.5.7

docker run -d -p 2182:2181 --name zookeeper_node2 --privileged --restart always --network mynet --ip 192.168.0.3 -v /usr/local/zookeeper-cluster/node2/volumes/data:/data -v /usr/local/zookeeper-cluster/node2/volumes/datalog:/datalog -v /usr/local/zookeeper-cluster/node2/volumes/logs:/logs -e ZOO_MY_ID=2 -e "ZOO_SERVERS=server.1=192.168.0.2:2888:3888;2181 server.2=192.168.0.3:2888:3888;2181 server.3=192.168.0.4:2888:3888;2181" zookeeper:3.5.7

docker run -d -p 2183:2181 --name zookeeper_node3 --privileged --restart always --network mynet --ip 192.168.0.4 -v /usr/local/zookeeper-cluster/node3/volumes/data:/data -v /usr/local/zookeeper-cluster/node3/volumes/datalog:/datalog -v /usr/local/zookeeper-cluster/node3/volumes/logs:/logs -e ZOO_MY_ID=3 -e "ZOO_SERVERS=server.1=192.168.0.2:2888:3888;2181 server.2=192.168.0.3:2888:3888;2181 server.3=192.168.0.4:2888:3888;2181" zookeeper:3.5.7

通过 docker-compose 文件来搭建 zookeeper 集群

# 创建docker-compose.yml文件
version: '3' # 版本号固定写法
services:
    zoo1:
        image: zookeeper:3.5.7 #使用的镜像
        restart: always #宕机后自动重启
        hostname: zoo1 #承载zookeeper容器的主机(父容器)名 可省略
        container_name: zoo1 #容器名
        privileged: true #使用该参数,container内的root拥有真正的root权 privileged启动的容器,可以看到很多host上的设备,并且可以执行mount。甚至允许你在docker容器中启动docker容器。
        ports: #主机和容器的端口映射
            - "2181:2181" 
        volumes:  #创建zookeeper容器在宿主机的挂载目录
            - /opt/zookeeper-cluster/zookeeper01/data:/data #数据
            - /opt/zookeeper-cluster/zookeeper01/datalog:/datalog #日志
            - /opt/zookeeper-cluster/zookeeper01/conf:/conf #配置文件
        environment: #zookeeper3.4 和zookeeper 3.5在docker环境下搭建集群差异就在这里 #zoo1为容器名,也是主机名,意思为使用容器的内网通信(1)Zookeeper3.5 中指定的 ZOO_SERVERS 参数的 IP 地址和端口号后面多加了 “;2181 ”。(2)ZOO_SERVERS 指定ip时本机的ip地址写 0.0.0.0。
            ZOO_MY_ID: 1
            ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    zoo2:
        image: zookeeper:3.5.7 #使用的镜像
        restart: always
        hostname: zoo2
        container_name: zoo2
        privileged: true
        ports:
            - "2182:2181"
        volumes:
           - /opt/zookeeper-cluster/zookeeper02/data:/data
           - /opt/zookeeper-cluster/zookeeper02/datalog:/datalog
           - /opt/zookeeper-cluster/zookeeper02/conf:/conf
        environment:
            ZOO_MY_ID: 2
            ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
    zoo3:
        image: zookeeper:3.5.7 #使用的镜像
        restart: always 
        hostname: zoo3
        container_name: zoo3
        privileged: true
        ports: 
            - "2183:2181"
        volumes:
           - /opt/zookeeper-cluster/zookeeper03/data:/data
           - /opt/zookeeper-cluster/zookeeper03/datalog:/datalog
           - /opt/zookeeper-cluster/zookeeper03/conf:/conf
        environment:
            ZOO_MY_ID: 3
            ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
            
# 启动该docker-compose实例
[root@iZ2ze0jazgwtuvi1lczdqfZ moudle]# docker-compose up -d
Creating zoo3 ... done
Creating zoo1 ... done
Creating zoo2 ... done

# 在当前目录查看当前集群运行状态
[root@iZ2ze0jazgwtuvi1lczdqfZ moudle]# docker-compose ps
Name              Command               State                                   Ports                                 
----------------------------------------------------------------------------------------------------------------------
zoo1   /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
zoo2   /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2182->2181/tcp,:::2182->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
zoo3   /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2183->2181/tcp,:::2183->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
  • 由于 2888、3888 端口不需要暴漏,就不需要映射了
  • 使用自定义的网络,并指定每个容器的 IP
  • 每个容器之间的环境是隔离的,所以容器内所使用的端口是一样的

# 进入容器中进行验证
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# docker exec -it a1925a /bin/bash
root@a1925ab286b2:/apache-zookeeper-3.7.0-bin# read escape sequence
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# docker exec -it a1925a bash
root@a1925ab286b2:/apache-zookeeper-3.7.0-bin# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
root@a1925ab286b2:/apache-zookeeper-3.7.0-bin# exit
exit
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# docker exec -it b011a26491a5 bash
root@b011a26491a5:/apache-zookeeper-3.7.0-bin# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
root@b011a26491a5:/apache-zookeeper-3.7.0-bin# exit
exit
[root@iZ2ze0jazgwtuvi1lczdqfZ zookeeper]# docker exec -it f23b929f8038 bash
root@f23b929f8038:/apache-zookeeper-3.7.0-bin# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
Zookeeper 的选举机制 Zookeeper 的选举机制 — 第一次启动

  • 服务器 1 启动,发起一次选举,服务器 1 投自己一票,此时服务器 1 票数一票,不够半数以上(三票),选举无法完成,服务器 1 状态保持为 LOOKING。
  • 服务器 2 启动,再次发起一次选举。服务器 1 和服务器 2 分别投自己一票并且交换选票信息:此时服务器 1 发现服务器 2 的 myid 比自己目前投票选举的(服务器 1)大,更改选票为推举服务器 2.此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,服务器 1、2状态保持 LOOKING。
  • 服务器 3 启动,此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果为:服务器 1为 0票,服务器 2为 0票,服务器 3 为 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选为 Leader,服务器 1、2 更改状态为 FOLLOWING,服务器 3 更改状态为 LEADING 。
  • 服务器 4 启动,发起一次选举。此时服务器 1、2、3 已经不是 LOOKING 状态,不会更改选票信息。交换选票信息结果:服务器3为 3 票,服务器 4 为 1票。此时服务器 4 服从多数,更改选票信息为服务器 3,并更改状态为 FOLLOWING。
  • 服务器 5 启动,过程跟 4 一样。

注意:

  • SID:服务器 ID。用来唯一标识一台 Zookeeper 集群中的机器,每台机器不能重复,和 myid一致。
  • ZXID:事务 ID,ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,汲取那种的每台机器的 ZXID 值不一定完全一致,这和 Zookeeper 服务器对于客户端更新请求的处理逻辑有关。(每次写 *** 作都有事务ID(ZXID))。
  • Epoch:每个 Leader 任期的代号。没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。
Zookeeper 的选举机制 — 非第一次启动

  • 当 Zookeeper 集群中的一台服务器出现以下两种情况之一的时候,就会开始进入 Leader 选举。
    • 服务器初始化
    • 服务器运行期间无法和 Leader 保持连接
  • 而当一台机器进入 Leader 选举流程的时候,当前集群也可能会处于以下两种状态:
    • 集群当中本来就已经存在一个 Leader
      • 对于第一种已经存在 Leader 的情况,机器试图去选举 Leader 的时候,会被告知当前服务器的 Leader 信息,对于该机器来说,仅仅需要和 Leader 机器建立连接,并进行状态同步即可。
    • 集群中确实不存在 Leader
      • 假设 Zookeeper 由 5 台服务器组成,SID 分别为1、2、3、4、5,ZXID 分别为 8、8、8、7、7,并且此时SID 为 3 的服务器是 Leader。某一时刻,3 和 5 服务器出现故障,因此开始进行 Leader 选举。
      • SID 为 1、2、4的机器投票情况(1,8,1)(Epoch,ZXID,SID),(1,8,2),(1,8,4)
      • 选举 Leader 规则:1、EPOCH 大的直接胜出 2、EPOCH 相同,事务ID(ZXID)大的胜出 3、事务ID(ZXID)相同,服务器ID(SID)大的胜出

ZNode 节点数据信息 ls -s /(根节点的数据信息)

  • czxid:创建节点的事务 zxid
    • 每次修改 Zookeeper 状态都会产生一个 Zookeeper 事务 ID,事务 ID是 Zookeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
  • ctime:znode 被创建的毫秒数(从 1970 年开始)
  • mzxid:znode 最后更新的事务 zxid
  • mtime:znode 最后修改的毫秒数(从 1970 年开始)
  • pZxid:znode 最后更新的子节点 zxid
  • cversion:znode 子节点变化号,znode 子节点修改次数
  • dataversion:znode 数据变化号
  • ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 sessionId,如果不是,临时节点则是 0
节点类型(持久 / 短暂 / 有序号 / 无序号)
  • 持久(Persistent):客户端与服务端断开连接后,创建的节点不删除
  • 短暂(Ephemeral):客户端和服务端断开连接后,创建的节点自己删除

  • 持久化目录节点(Persistent)

    • 客户端与 Zookeeper 断开连接后,该节点依旧存在
  • 持久化顺序编号目录节点(Persistent_sequential)

    • 客户端与Zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号

    • 说明:

    • 创建 ZNode 的时候设置顺序标识,znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。

    • 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

  • 临时目录节点( Ephemeral)

    • 客户端与 Zookeeper 断开连接后,该节点被删除
  • 临时顺序编号目录节点(Ephemeral_sequential)

    • 客户端与 Zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺序编号
[zk: localhost:2181(CONNECTED) 1] ls / # 默认有一个zookeeper节点 
[zookeeper]
[zk: localhost:2181(CONNECTED) 2] ls -s / # 查看对应路径下的所有节点信息
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

[zk: localhost:2181(CONNECTED) 3] ls /
[zookeeper]

# 创建持久节点(默认)
[zk: localhost:2181(CONNECTED) 4] create /sanguo "diaochan" 
Created /sanguo
[zk: localhost:2181(CONNECTED) 5] ls /
[sanguo, zookeeper]
[zk: localhost:2181(CONNECTED) 6] create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
[zk: localhost:2181(CONNECTED) 7] ls /
[sanguo, zookeeper]
[zk: localhost:2181(CONNECTED) 8] ls /sanguo
[shuguo]

# 获取对应节点内容
[zk: localhost:2181(CONNECTED) 9] get -s /sanguo
diaochan
cZxid = 0x100000004
ctime = Wed Dec 15 08:27:39 UTC 2021
mZxid = 0x100000004
mtime = Wed Dec 15 08:27:39 UTC 2021
pZxid = 0x100000005
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 1
[zk: localhost:2181(CONNECTED) 10] create /sanguo/weiguo "caocao"
Created /sanguo/weiguo
[zk: localhost:2181(CONNECTED) 11] ls /sanguo
[shuguo, weiguo]

# 创建持久化顺序编号目录节点
[zk: localhost:2181(CONNECTED) 12] create -s /sanguo/weiguo/zhangliang "zhangliang"
Created /sanguo/weiguo/zhangliang0000000000
[zk: localhost:2181(CONNECTED) 13] ls /sanguo/weiguo/zhangliang
Node does not exist: /sanguo/weiguo/zhangliang
[zk: localhost:2181(CONNECTED) 14] ls /sanguo/weiguo
[zhangliang0000000000]
[zk: localhost:2181(CONNECTED) 15] ls2 /sanguo/weiguo
'ls2' has been deprecated. Please use 'ls [-s] path' instead.
[zhangliang0000000000]
cZxid = 0x100000006
ctime = Wed Dec 15 08:30:32 UTC 2021
mZxid = 0x100000006
mtime = Wed Dec 15 08:30:32 UTC 2021
pZxid = 0x100000007
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 1
[zk: localhost:2181(CONNECTED) 16] ls /sanguo/weiguo/zhangliao "zhangliao"
'ls path [watch]' has been deprecated. Please use 'ls [-w] path' instead.
Node does not exist: /sanguo/weiguo/zhangliao
[zk: localhost:2181(CONNECTED) 17] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000001
[zk: localhost:2181(CONNECTED) 18] quit

WATCHER::

WatchedEvent state:Closed type:None path:null
root@zoo1:/apache-zookeeper-3.5.7-bin/bin# ./bin/zkCli.sh
bash: ./bin/zkCli.sh: No such file or directory
root@zoo1:/apache-zookeeper-3.5.7-bin/bin# zkCli.sh
Connecting to localhost:2181
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
# 重启客户端后发现持久化节点还存在
[zk: localhost:2181(CONNECTED) 0] ls /sanguo
[shuguo, weiguo]
[zk: localhost:2181(CONNECTED) 1] ls /sanguo/weiguo
[zhangliang0000000000, zhangliao0000000001]

# 创建临时界定
[zk: localhost:2181(CONNECTED) 2] create -e /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo
[zk: localhost:2181(CONNECTED) 3] ls /sanguo
[shuguo, weiguo, wuguo]

# 创建临时顺序编号节点
[zk: localhost:2181(CONNECTED) 4] create -s -e /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo0000000003 # 节点的编号还在依次增加
[zk: localhost:2181(CONNECTED) 5] ls /sanguo
[shuguo, weiguo, wuguo, wuguo0000000003]
[zk: localhost:2181(CONNECTED) 6] quit

WATCHER::

WatchedEvent state:Closed type:None path:null
root@zoo1:/apache-zookeeper-3.5.7-bin/bin# zkCli.sh
Connecting to localhost:2181
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[sanguo, zookeeper]

# 重启客户端发现临时节点 wuguo不存在
[zk: localhost:2181(CONNECTED) 1] ls /sanguo
[shuguo, weiguo]
监听器原理

监听器原理详解

  • 首先要有一个 main() 线程
  • 在main线程中创建 Zookeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(Connect),一个负责监听(Listener)
  • 通过 connect 线程将注册的监听事件发送给 Zookeeper
  • 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中
  • Zookeeper 监听到有数据或者路径变化,就会将这个消息添加到列表中
  • listener 线程内部调用了 process() 方法

常见的监听

  • 监听节点数据的变化
  • 监听子节点增减的变化

监听节点数据的变化 get -w

# 启动两台zookeeper客户端
# 获取 /sanguo节点内容
[zk: localhost:2181(CONNECTED) 2] get -s /sanguo
diaochan
cZxid = 0x100000004
ctime = Wed Dec 15 08:27:39 UTC 2021
mZxid = 0x100000004
mtime = Wed Dec 15 08:27:39 UTC 2021
pZxid = 0x10000000d
cversion = 6
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 2

# 监听 /sanguo 节点
[zk: localhost:2181(CONNECTED) 3] get -w /sanguo
diaochan
# 当修改了/sanguo节点的内容后监听器返回监听动作
# 这里注意,因为只注册了一次监听动作,所以只能监听一次对应节点的变化,一次监听过后,则不再进行监听
[zk: localhost:2181(CONNECTED) 4] 
WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo

# 另一台客户端
# 第一次修改(被监听到)
[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
# 第二次修改(监听失效)
[zk: localhost:2181(CONNECTED) 2] set /sanguo "yangfeiyan"

监听节点数的变化 ls -w

# 一台客户端开启监听
[zk: localhost:2181(CONNECTED) 3] ls -w /sanguo
[shuguo, weiguo]
[zk: localhost:2181(CONNECTED) 4] 
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo

# 另一台客户端
# 第一次创建节点(被监听到)
[zk: localhost:2181(CONNECTED) 4] create /sanguo/jin "simayi"
Created /sanguo/jin
# 第二次创建节点(监听失效)
[zk: localhost:2181(CONNECTED) 4] create /sanguo/ming "zhuyuanzhang"
Created /sanguo/ming

注意:节点的路径变化,也是注册一次,生效一次,想要多次生效,就需要多次注册。

节点的删除与查看
[zk: localhost:2181(CONNECTED) 2] ls /
[sanguo, zookeeper]
[zk: localhost:2181(CONNECTED) 3] ls /sanguo
[jin, shuguo, weiguo]

# 删除 /sanguo/jin 节点
[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
[zk: localhost:2181(CONNECTED) 5] ls /sanguo
[shuguo, weiguo]

# delete 删除节点只能删除空节点,其下面不能有其他节点,不然不能使用delete删除
[zk: localhost:2181(CONNECTED) 6] delete /sanguo
Node not empty: /sanguo

# 使用deleteall可以删除有子节点的节点
[zk: localhost:2181(CONNECTED) 7] deleteall /sanguo 
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper]

# 查看节点状态  使用stat + 节点路径
[zk: localhost:2181(CONNECTED) 10] stat /zookeeper
cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
客户端API的 *** 作

前提:保证 hadoop102、hadoop103、hadoop104服务器上 Zookeeper 集群服务端启动。

添加 pom 依赖




  org.apache.zookeeper
  zookeeper
  3.7.0




  org.apache.logging.log4j
  log4j-core
  2.15.0




  junit
  junit
  4.13.2
  test


创建 log4j.properties 文件

需要在项目的 src/main/resources 目录下,新建立一个文件,,命名为 log4j.properties,在文件中填入

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  

运行代码如下:

public class ZkClient {

    // 注意,这里是集群的时候不能加端口号,只需要写ip就可以了
    private String connectString = "39.107.103.173";
    private int sessionTimeout = 20000;
    private ZooKeeper zooKeeper;

    @Before
    public void init() throws IOException {
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 监听根目录下所有节点的集合
                List children = null;
                System.out.println("------------------------------");
                try {
                    children = zooKeeper.getChildren("/", true);
                    for (String ch : children){
                        System.out.println(ch);
                    }
                    System.out.println("------------------------------");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Test
    public void create() throws KeeperException, InterruptedException, IOException {
        System.out.println("连接状态是:"+zooKeeper.getState());
        // 创建一个节点
        String nodeCreated = zooKeeper.create("/atguigu1", "ssavi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        // 注意,因为zookeeper注册一次监听只能监听一次,所以再运行结束以后我们再进行增删节点 *** 作监听不到
        // 所以,要在注册中心注册监听,监听发结束以后就立马又注册了新的监听事件
        // 监听根目录下所有节点的集合
        List children = null;
        children = zooKeeper.getChildren("/", true);
        for (String ch : children){
            System.out.println(ch);
        }
        // 延时,保持一直监听
        Thread.sleep(Long.MAX_VALUE);
    }
    
    // 判断节点是否存在
    @Test
    public void exists() throws KeeperException, InterruptedException {
        Stat exists = zooKeeper.exists("/wcc", false);
        System.out.println(exists == null ? "not exists" : "exists");
    }
}
3.4、客户端向服务端写数据流程

写流程之写入请求直接发送给 Leader 节点

写流程之写入请求直接发送给 Follower 节点

4、服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求分析

具体实现

  • 先在集群上创建 /servers 节点

服务端代码

public class DistributeServer {

    ZooKeeper zooKeeper;
    private String connectString = "39.107.103.173";
    private int sessionTimeout = 200000;


    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeServer server = new DistributeServer();
        // 第一步:获取 zk 连接
        server.getConnect();
        // 第二步:注册服务器到 zk 集群
        server.registServerToZookeeper(args[0]);
        // 第三步:启动业务逻辑(这里以休眠作为业务逻辑过程)
        server.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Integer.MAX_VALUE);
    }

    private void registServerToZookeeper(String hostname) throws KeeperException, InterruptedException {
        // 创建临时并且带序号的节点
        zooKeeper.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online");
    }

    private void getConnect() throws IOException {
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

客户端代码

public class DistributeClient {

  private String connectString = "39.107.103.173";
  private int sessionTimeout = 300000;
  private static ZooKeeper zooKeeper;

  public static void main(String[] args) throws Exception {
    DistributeClient distributeClient = new DistributeClient();
    // 1.获取Zookeeper 连接
    distributeClient.getConnect();

    // 2.监听/servers下面子节点的增加和删除
    distributeClient.getServerList();

    // 3.业务逻辑(睡眠)
    distributeClient.business();
  }

  // 获取 Zookeeper 连接
  private void getConnect() throws Exception {
    zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
      @Override
      public void process(WatchedEvent watchedEvent) {
        try {
          getServerList();
        } catch (KeeperException e) {
          e.printStackTrace();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    });
  }

  // 监听/servers下面子节点的增加和删除
  private void getServerList() throws KeeperException, InterruptedException {
    // 获取到 /servers 下面所有的节点
    List children = zooKeeper.getChildren("/servers", true);
    List strings = new ArrayList<>();
    // 遍历每一个节点然后取出里面的值
    for (String str : children){
      byte[] data = zooKeeper.getData("/servers/" + str, false, null);
      strings.add(new String(data));
    }
    // 打印
    System.out.println(strings);
  }

  // 模拟业务逻辑代码
  private void business() throws InterruptedException {
    Thread.sleep(Integer.MAX_VALUE);
  }
}

执行流程:

  • 首先启动 服务端 创建一个 /servers 节点,然后关闭服务端程序
  • 之后启动客户端程序,在 xshell 终端手动添加节点(或者运行服务端来添加节点),观察控制台变化

  • 这样的话就实现了服务器的动态上下线的功能
5、Zookeeper 分布式锁的案例

什么叫做分布式锁?

比如进程1在使用该资源的时候,会先去获得锁,进程1获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,进程1用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫做分布式锁。

public class DistributedLock {

  ZooKeeper zooKeeper;
  private final String connectString = "39.107.103.173";
  private final int sessionTimeout = 200000;

  private CountDownLatch countDownLatch = new CountDownLatch(1);
  private CountDownLatch waitLatch = new CountDownLatch(1);

  // 定义前一个节点的路径
  private String waitPath;

  String currentMode;

  public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
    DistributedLock distributedLock1 = new DistributedLock();
    DistributedLock distributedLock2 = new DistributedLock();

    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          distributedLock1.zklock();
          System.out.println("线程1启动,获取到锁...");
          Thread.sleep(5000);
          distributedLock1.unZookeeperLock();
          System.out.println("线程1释放锁...");
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start();

    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          distributedLock2.zklock();
          System.out.println("线程2启动,获取到锁...");
          Thread.sleep(5000);
          distributedLock2.unZookeeperLock();
          System.out.println("线程2释放锁...");
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start();
  }

  public DistributedLock() throws IOException, InterruptedException, KeeperException {
    // 获取 zookeeper 集群的连接
    zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
      @Override
      public void process(WatchedEvent watchedEvent) {
        // countDownLatch 如果连接上zookeeper 可以释放该对象
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
          countDownLatch.countDown();
        }
        // waitLatch 也需要释放,让其对应节点开始获取锁,当监听的节点被删除并且节点的路径跟路径相同,那么可以使其他线程开始争夺锁了
        if(watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
          waitLatch.countDown();
        }
      }
    });

    // 等待 zookeeper 正常连接后,往下走程序
    countDownLatch.await();

    // 判断根节点 locks 是否存在,返回的是状态信息
    Stat exists = zooKeeper.exists("/locks", false);

    if(exists == null){
      // 创建根节点
      String locks = zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
  }

  // 对zookeeper 进行加锁
  public void zklock() throws KeeperException, InterruptedException {
    // 所谓的加锁就是创建对应的临时带序号节点
    currentMode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("创建的节点名称:" + currentMode);
    // 判断创建的节点是否是最小的序号节点,如果是获取到锁,如果不是,那就要监听它前一个节点
    List children = zooKeeper.getChildren("/locks", false);
    // 如果children只有一个值,那就直接获取锁,如果有多个节点,那就需要判断
    if(children.size() == 1){
      return;
    }else{
      // 先对子节点排好序
      Collections.sort(children);
      // 获取节点名称 seq-000000(序号)
      String thisNode = currentMode.substring("/locks/".length());
      // 通过获取到的序号获取在子节点中的位置
      int index = children.indexOf(thisNode);
      // 进行判断
      if(index < 0){
        System.out.println("数据异常...");
      }else if(index == 0){
        // 就一个节点,则可以获取锁了
        return;
      }else{
        // 不是一个节点,就需要进行监听其前一个节点
        waitPath = "/locks/" + children.get(index - 1);
        zooKeeper.getData(waitPath, true, new Stat());
        // 等待前一个节点监听
        waitLatch.await();
        return;
      }
    }
  }

  // 对 zookeeper 进行解锁
  public void unZookeeperLock() throws KeeperException, InterruptedException {
    // 删除节点即释放锁
    zooKeeper.delete(currentMode, -1);
  }
}

这里,为了更好的理解 zookeeper 实现分布式锁的过程,我们先说一下错误的分布式锁实现方式

错误的分布式锁实现方式

多个客户端同时去创建同一个临时节点,哪个客户端第一个创建成功,就成功的获取锁,其他客户端获取失败

就像双十一,很多人同时在线去秒杀一个商品,谁手速快谁就能秒杀掉

获取锁的流程

  • 四个客户端(这里以4个为例)同时创建一个临时节点
  • 谁第一个创建成功临时节点,就代表持有了这个锁(这里临时节点就代表锁)
  • 如果没有获取的客户端判断已经有任创建成功了,那么久开始监听这个临时节点的变化

释放锁的流程

  • 如果获取锁的客户端执行任务完毕(即创建节点成功),与 zookeeper 断开了连接
  • 这时候临时节点会自动被删除掉,因为临时节点在断开连接以后就会被删除
  • 其他没有获取到锁的客户端监听到临时节点删除了,就会一拥而上去创建临时节点(即创建锁)

存在的问题分析

当临时节点被删除的时候,其他的三个客户端一拥而上抢着创建节点,此时三个节点数量还比较少,性能上还看不出来什么问题。

那如果是一千个客户端在监听节点呢?一旦节点被删除了,会唤醒一千个客户端,一千个客户端同时来创建节点。

但是却只能有一个客户端能够获取到锁,却要让一千个客户端同时来竞争这个资源,对 zookeeper 的压力会很大,同时浪费这些客户端的线程资源,其中其他没有获取到锁的客户端都做了无用功。

这就叫做惊群现象,也叫羊群现象。

所以说,我们使用 zookeeper 正确实现分布式锁的方式应该是使用 顺序临时节点。

分析

为什么不采用持久节点 呢?因为持久节点必须要客户端手动删除,否则它会一直存在 zookeeper 中

如果我们的客户端获取到了锁,还没释放锁就突然宕机了,那么这个锁会一直存在不被释放,导致其他客户端无法获取锁

zookeeper 实现分布式锁的功能是比较健全的,但是性能上会比较差。比如 zookeeper 要维护集群自身信息的一致性,频繁创建和删除节点等原因。

5.1、Curator 框架实现分布式锁案例

原生的 JavaAPI 开发存在的问题

  • 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch。
  • Watch 需要重复注册,不然就不能生效
  • 开发的复杂性还是比较高的
  • 不支持多节点删除和创建,需要自己去递归

导入对应的 maven 依赖


  org.apache.curator
  curator-client
  5.2.0



  org.apache.curator
  curator-recipes
  5.2.0



  org.apache.curator
  curator-framework
  5.2.0

public class CuratorLockTest {

  public static void main(String[] args) {
    // 创建分布式锁1
    InterProcessMutex lock1 = new InterProcessMutex(getCuratorframework(), "/locks");

    // 创建分布式锁2
    InterProcessMutex lock2 = new InterProcessMutex(getCuratorframework(), "/locks");

    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          // 获取锁
          lock1.acquire();
          System.out.println("线程1 获取到锁...");
          // 再次获取锁(说明是可重入锁)
          lock1.acquire();
          System.out.println("线程1 再次获取到锁...");
          Thread.sleep(5000);
          // 释放锁
          lock1.release();
          System.out.println("线程1 释放锁...");
          lock1.release();
          System.out.println("线程1 再次释放锁...");
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start();

    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          // 获取锁
          lock2.acquire();
          System.out.println("线程2 获取到锁...");
          // 再次获取锁(说明是可重入锁)
          lock2.acquire();
          System.out.println("线程2 再次获取到锁...");
          Thread.sleep(5000);
          // 释放锁
          lock2.release();
          System.out.println("线程2 释放锁...");
          lock2.release();
          System.out.println("线程2 再次释放锁...");
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start();
  }

  // 创建 Curator 客户端
  private static Curatorframework getCuratorframework(){
    ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
    Curatorframework build = CuratorframeworkFactory.builder().connectString("39.107.103.173")
        .connectionTimeoutMs(200000)
        .sessionTimeoutMs(200000)
        .retryPolicy(policy) // 该参数的意思是创建连接失败的时候经过多少秒后再次尝试进行连接,尝试几次
        .build();
    // 启动客户端
    build.start();
    System.out.println("zookeeper 启动成功");
    return build;
  }
}

结果如下:

选举机制

半数机制,超过半数的投票通过,即为通过。

  • 第一次启动选举规则
    • 投票超过半数的时候,服务器 id 大的胜出
  • 第二次启动选举规则:
    • EPOCH(服务器选举任期代号) 大的直接胜出
    • EPOCH 相同,事务 id 大的胜出
    • 事务 id 相同,服务器 id 大的胜出
6、Zookeeper 源码分析

思考:Zookeeper 是如何保证数据一致性的?这也是困扰分布式系统框架的一个难题。

拜占庭将军问题

拜占庭将军是一个协议问题,拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一敌军。问题是这些将军在地理位置上是分隔开的,并且将军中存在叛徒。叛徒可以任意行动以达到以下目标:欺骗某些将军采取进攻行动,促成一个不是所有将军都同意的决定,当将军们不希望进攻的时候促成进攻行动,或者迷惑某些将军,使得他们无法做出决定。如果叛徒达到了这些目的之一,则任何攻击行动的结果都是注定要失败的,只有完全达成一致性的努力才能获得胜利。

Paxos 算法

一种基于消息传递且具有高度容错特性的一致性算法。

Paxos 算法解决的问题:就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。

Paxos 算法的描述

  • 在一个 Paxos 系统中,首先将所有节点划分为 ProPoser(提议者)、Acceptor(接受者)和 Learner(学习者)(注意:每个节点都可以身兼数职)。
  • 一个完整的 Paxos 算法流程分为三个阶段:
  • Prepare 准备阶段:
    • Proposer 向多个 Acceptor 发出 Propose 请求 Promise(承诺)
    • Acceptor 针对收到的 Propose 请求进行 Promise(承诺)
  • Accept 接受阶段:
    • Proposer 收到多数 Acceptor 承诺的 Promise 后,向 Accrptor 发出的 Propose 请求
    • Acceptor 针对收到的 Propose 请求进行 Accept 处理
  • Learn 学习阶段:Proposer 将形成的决议发送给所有 Learners

Paxos 算法详细流程

  • Prepare:Proposer 生成全局唯一且递增的 Proposal ID,向所有 Acceptor 发送 Propose 请求,这里无需携带提案内容,只携带 Proposal ID 即可。
  • Promise:Acceptor 收到 Propose 请求后,做出两个承诺,一个应答
    • 不再接受 Proposal ID 小于等于 当前请求的 Propose 请求
    • 不再接受 Proposal ID 小于 当前请求的 Accept 请求
    • 不违背以前做出的承诺下,回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID,没有则返回空值
  • Propose:Proposer 收到多数 Accept 的 Promise 应答后,从应答中选择 Proposal ID 最大的提案的 Value,作为本次要发起的提案。如果所有应答的提案 Value 均为空值,则可以自己随意决定提案 Value。然后携带当前 Proposal ID,向所有 Acceptor 发送 Propose 请求。
  • Accept:Acceptor 收到 Propose 请求后,在不违背自己之前做出的承诺下,接受并持久化当前 Proposal ID 和 提案 Value。
  • Learn:proposer 收到多数 Acceptor 的 Accept 后,决议形成,将形成的决议发送给所有的 Learn。

情况一:

情况二:

情况三:

造成这种情况的原因是系统中有一个以上的 Proposer(提案者),多个 Proposers 相互争夺 Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个 Proposer,不会出现活锁的情况,此时只会出现例子中的第一种情况。

Zab协议

什么是 Zab 算法?

Zab 算法借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后 Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案。

Zab 协议的内容

Zab 协议包括两种基本的模式:消息广播、崩溃恢复。

消息广播的情况:

  • 客户端发起一个写 *** 作请求
  • Leader 服务器将客户端的请求转化为事务 Proposal 提案,同时为每个 Proposal 分配一个全局的 ID,即 zxid。
  • Leader 服务器为每个 Follower 服务器分配一个单独的对垒,然后将需要广播的 Proposal 依次放到队列中去,并且根据 FIFO 策略进行消息发送。
  • Follower 接收到 Proposal 后,会首先将其以事务日志的方式写入本地磁盘中,写入成功后向 Leader 反馈一个 Ack 响应消息。
  • Leader 接收到半数以上 Follower 的 Ack 响应消息后,即认为消息发送成功,可以发送 commit 消息。
  • Leader 向所有 Follower 广播 commit 消息,同时自身也会完成事务提交。Follower 接收到 commit 消息后,会将上一条事务提交。
  • Zookeeper 采用 Zab协议的核心,就是只要有一台服务器提交了 Proposal,就要确保所有的服务器最终都能正确提交 Proposal。

Zab 协议针对事务请求的处理过程类似于一个两阶段的提交过程:

  • 广播事务阶段
  • 广播提交 *** 作

这两阶段提交模型如下,有可能因为 Leader 宕机带来数据不一致,比如:

  • Leader 发起一个事务 Proposal 后就宕机,Follower 都没有 Proposal
  • Leader 收到半数 ACK 后宕机,没来得及向 Follower 发送 Commit

崩溃恢复 ----- 假设错误

一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower 的联系,那么就会进入崩溃恢复模式。

  • 假设两种服务器异常情况:
  • 假设一个事务在 Leader 提出之后,Leader 挂了。
  • 一个事务在 Leader 上提交了,并且过半的 Follower 都响应 Ack了,但是 Leader 在 Commit 消息发出之前挂了。

Zab 协议崩溃恢复要求满足以下两个要求:

  • 确保已经被 Leader 提交的提案 Proposal ,必须最终被所有的 Follower 服务器提交。
  • 确保丢弃已经被 Leader 提出的,但是没有被提交的 Proposal。

崩溃恢复 ----- Leader 选举

恢复主要包括两部分:Leader 选举 和 数据恢复。

Leader 选举:根据上述要求,Zab 协议需要保证选举出来的 Leader 需要满足以下条件:

  • 新选举出来的 Leader 不能包含未提交的 Proposal。即新 Leader 必须都是已经提交了 Proposal 的 Follower 服务器节点
  • 新选举的 Leader 节点中含有最大的 zxid。这样做的好处是可以避免 Leader 服务器检查 Proposal 的提交和丢弃工作。

崩溃恢复 ------ 数据恢复

Zab 如何数据同步?

  • 完成 Leader 选举后,在正式开始工作之前(接收事务请求,然后提出新的 Proposal),Leder 服务器会首先确认事务日志中的所有的 Proposal 是否已经被集群中过半的服务器 Commit。
  • Leader 服务器需要确保所有的 Follower 服务器能够接收到每一条事务的 Proposal,并且能够将所有已经提交的事务 Proposal 应用到内存数据中。等到 Follower 将所有尚未同步的事务 Proposal 都从 Leader 服务器上同步过,并且应用到内存数据中以后,Leader 才会把该 Follower 加入到真正可用的 Follower 列表中。

CAP 理论

CAP 理论告诉我们,一个分布式系统不可能同时满足以下三种情况:

  • 一致性(C:Consistency)
  • 可用性(A:Available)
  • 分区容错性(P:Partition Tolerance)

这三个基本需求,最多只能满足其中的两项,因为 P 是必须的,因此往往选择就在 CP 或者 AP 中。

  • 一致性

    在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新 *** 作后,应该保证系统的数据仍然处于一致的状态。

  • 可用性

    可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个 *** 作请求总是能够在有限的时间内返回结果。

  • 分区容错性

    分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。

Zookeeper 保证的是 CP

  • Zookeeper 不能保证每次服务请求的可用性。(注:在极端环境下,Zookeeper 可能会丢弃一些请求,消费者程序更需要重新请求才能获得结果)。所以说,Zookeeper 不能保证服务可用性。
  • 进行 Leader 选举的时候集群不可用

辅助源码

Leader 和 Follower 中的数据会在内存和磁盘中各自保存一份,所以需要将内存中的数据持久化到磁盘中。(即在代码中是以序列化的形式来实现的)

在 org.apache.zookeeper.server.persistence 下的相关类都是序列化相关的代码

在磁盘当中通过快照机制和日志文件记录 *** 作来保存内存当中的数据,日志文件记录下所有的事务 *** 作,快照机制保存当前数据的镜像。

  • Zookeeper 中的数据模型,是一棵树,DataTree,每个节点,叫做 DataNode
  • Zookeeper 集群中的 DataTree 时刻保持状态同步
  • Zookeeper 集群中的每个 Zookeeper 节点中,数据在内存和磁盘中都有一份完整的数据
    • 内存数据: DataTree
    • 磁盘数据:快照文件 + 编辑日志

Zookeeper 服务端初始化源码解析

解析参数的过程

// 先找到QuorumPeerMain的main方法
public static void main(String[] args) {
        // 先找到main方法的入口,作为启动程序,创建QuorumPeerMain对象
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            // 在初始化方法中解析参数
            main.initializeAndRun(args);
            ...
}

// 注意参数 args 是我们zookeeper的配置文件作为的参数 zoo.cfg
protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
    // 定义QuorumPeerConfig类对象,用以解析参数
    QuorumPeerConfig config = new QuorumPeerConfig();
    // 首先进行参数解析工作
    if (args.length == 1) { // 如果只有一个参数,则直接对第一个参数进行解析,有一个参数那说明就是我们的zoo.cfg文件 
        config.parse(args[0]);
    }
    // 启动定时任务,对过期快照,执行删除(默认是关闭)
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), // 最少保留快照的个数 默认为3
        config.getPurgeInterval()); // 默认为0,表示关闭
    // 快照线程启动
    purgeMgr.start();
    ....
}
    
// 传进来的参数对应的是文件对应的路径 zoo.cfg文件的路径
public void parse(String path) throws ConfigException {
    LOG.info("Reading configuration from: " + path);

    try {
        // 根据传入进来的路径参数 path创建一个文件
        File configFile = (new VerifyingFileFactory.Builder(LOG)
            .warnForRelativePath()
            .failForNonExistingPath()
            .build()).create(path);
        // 通过Properties 的方式进行加载
        Properties cfg = new Properties();
        // 通过输入流来读取外部的路径文件
        FileInputStream in = new FileInputStream(configFile);
        try {
            // 加载配置文件内容
            cfg.load(in);
            configFileStr = path;
        } finally {
            in.close();
        }

        // 解析该路径对应的配置文件的内容
        parseProperties(cfg); 
        ...
}
    
    
// 参数zkProp:对应的多个对应的 K-V值
// 注意,这个方法主要是用来配置文件中的内容来给我们的成员局部变量来赋值,获取其中的值
// 这里,我们要看一下serverID是为什么跟myid文件中的内容一样的,是在方法最下面
public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
    int clientPort = 0;
    int secureClientPort = 0;
    String clientPortAddress = null;
    String secureClientPortAddress = null;
    VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
    for (Entry entry : zkProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        if (key.equals("dataDir")) {
            dataDir = vff.create(value);
        } else if (key.equals("dataLogDir")) {
            dataLogDir = vff.create(value);
        } else if (key.equals("clientPort")) {
            clientPort = Integer.parseInt(value);
        } else if (key.equals("localSessionsEnabled")) {
            localSessionsEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("localSessionsUpgradingEnabled")) {
            localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("clientPortAddress")) {
            clientPortAddress = value.trim();
        } else if (key.equals("secureClientPort")) {
            secureClientPort = Integer.parseInt(value);
        } else if (key.equals("secureClientPortAddress")){
            secureClientPortAddress = value.trim();
        } else if (key.equals("tickTime")) {
            tickTime = Integer.parseInt(value);
        } else if (key.equals("maxClientCnxns")) {
            maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
            minSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("maxSessionTimeout")) {
            maxSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("initLimit")) {
            initLimit = Integer.parseInt(value);
        } else if (key.equals("syncLimit")) {
            syncLimit = Integer.parseInt(value);
        } else if (key.equals("electionAlg")) {
            electionAlg = Integer.parseInt(value);
        } else if (key.equals("quorumListenOnAllIPs")) {
            quorumListenonAllIPs = Boolean.parseBoolean(value);
        } else if (key.equals("peerType")) {
            if (value.toLowerCase().equals("observer")) {
                peerType = LearnerType.OBSERVER;
            } else if (value.toLowerCase().equals("participant")) {
                peerType = LearnerType.PARTICIPANT;
            } else
            {
                throw new ConfigException("Unrecognised peertype: " + value);
            }
        } else if (key.equals( "syncEnabled" )) {
            syncEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("dynamicConfigFile")){
            dynamicConfigFileStr = value;
        } else if (key.equals("autopurge.snapRetainCount")) {
            snapRetainCount = Integer.parseInt(value);
        } else if (key.equals("autopurge.purgeInterval")) {
            purgeInterval = Integer.parseInt(value);
        } else if (key.equals("standaloneEnabled")) {
            if (value.toLowerCase().equals("true")) {
                setStandaloneEnabled(true);
            } else if (value.toLowerCase().equals("false")) {
                setStandaloneEnabled(false);
            } else {
                throw new ConfigException("Invalid option " + value + " for standalone mode. Choose 'true' or 'false.'");
            }
        } else if (key.equals("reconfigEnabled")) {
            if (value.toLowerCase().equals("true")) {
                setReconfigEnabled(true);
            } else if (value.toLowerCase().equals("false")) {
                setReconfigEnabled(false);
            } else {
                throw new ConfigException("Invalid option " + value + " for reconfigEnabled flag. Choose 'true' or 'false.'");
            }
        } else if (key.equals("sslQuorum")){
            sslQuorum = Boolean.parseBoolean(value);
        } else if (key.equals("portUnification")){
            shouldUsePortUnification = Boolean.parseBoolean(value);
        } else if (key.equals("sslQuorumReloadCertFiles")) {
            sslQuorumReloadCertFiles = Boolean.parseBoolean(value);
        } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
            throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
        } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
            quorumEnableSasl = Boolean.parseBoolean(value);
        } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
            quorumServerRequireSasl = Boolean.parseBoolean(value);
        } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
            quorumLearnerRequireSasl = Boolean.parseBoolean(value);
        } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
            quorumLearnerLoginContext = value;
        } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
            quorumServerLoginContext = value;
        } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
            quorumServicePrincipal = value;
        } else if (key.equals("quorum.cnxn.threads.size")) {
            quorumCnxnThreadsSize = Integer.parseInt(value);
        } else {
            System.setProperty("zookeeper." + key, value);
        }
    }
    ....
    if (dynamicConfigFileStr == null) {
        // 该方法调用了setMyid方法设置了服务器id的值serverid和myid相同
        setupQuorumPeerConfig(zkProp, true);
        if (isDistributed() && isReconfigEnabled()) {
            // we don't backup static config for standalone mode.
            // we also don't backup if reconfig feature is disabled.
            backupOldConfig();
        }
    }
    ...
}
    
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
        throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    // 设置服务器对应的serverID 即myID
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}
    
// 设置服务器的ID
private void setupMyId() throws IOException {
    // 找到dataDir数据目录下的myid文件
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
        return;
    }
    // 利用缓冲流读取文件内容
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
        // 因为myid文件中只有一行内容来存储服务器的id,所以这里只需要存储一行
        myIdString = br.readLine();
    } finally {
        br.close();
    }
    try {
        // 注意,这里将解析到的myid文件的内容的值赋值给了serverID,这就是服务器id和myid相同的原因
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("serverid " + myIdString
                + " is not a number");
    }
}

Zookeeper 服务端初始化源码解析

Follower 和 Leader 状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的 Leader 更新自己状态为 Leader,其他节点更新自己状态为 Follower。

Leader 更新状态入口:leader.lead()

Follower 更新状态入口:follower.followerLeader()

注意:

  • Follower 必须要让 leader 知道自己的状态:epoch、zxid、sid

    • 必须找出谁是 Leader
    • 发起请求连接 Leader
    • 发送自己的信息给 Leader
    • Leader 接收到信息,必须要返回对应的信息给 Follower
  • 当 Leader 得知 Follower 的状态了,就确定需要做何种方式的数据同步 DIFF、TRUNC、SNAPO

  • 执行数据同步

  • 当 Leader 接收召超过半数的 Follower 的 ACK 之后,进入正常工作状态,集群启动完成了。
    最终总结同步方式为:

  • DIFF 咱两一样,不需要做什么

  • TRUNC Follower 的 zxid 比 Leader 的 zxid 大,所以 Follower 要回滚

  • COMMIT Leader 的 zxid 比 Follower 的 zxid 大,发送 Proposal 给 Follower 提交执行

  • 如果 Follower 并没有任何数据,直接使用 SNAP 的方式来执行数据同步(直接把数据全部序列化到 Follower)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5682872.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存