Zookeeper

Zookeeper,第1张

Zookeeper Zookeeper

官网地址:https://zookeeper.apache.org/doc/current/zookeeperOver.html

一、概述

概述参考官网

1.什么是Zookeeper?

Zookeeper是分布式应用程序的分布式开源协调服务。它公开了一组简单的原语,分布式应用程序可以构建这些原语来实现更高级别的同步、配置维护以及组和命名服务。

Zookeeper出现的最终目的是减轻分布式应用程序从头开始实现协调服务的责任。

2.Zookeeper的特点

  • Zookeeper服务以一个Leader,多个follower组成集群,集群中各个服务节点的数据一致,集群中直到大多数节点可用,整个服务就可以正常运行。
  • 顺序一致性 - 来自客户端的更新将按发送顺序应用。
  • 原子性 - 更新要么成功要么失败。没有部分结果。
  • 单一系统映像 - 无论客户端连接到哪个服务器,它都会看到相同的服务视图。
  • 可靠性 - 应用更新后,它将从那时起持续存在,直到客户端覆盖更新。
  • 及时性 - 系统的客户视图保证在特定时间范围内是最新的。

3.如何实现的,其数据结构是怎么样的

Zookeeper提供的命名空间很像标准文件系统的命名空间(树形结构)。名称以/分隔的一系列路径元素。每个节点都由路径标识。

与标准的文件系统的区别在于:Zookeeper的命名空间的每一个节点都可以拥有其关联的数据和子节点(可以理解为:Linux中的目录可以存储数据,文件也可以作为一个目录)。

每一个节点称之为Znode。它维护了数据更改、ACL(访问控制列表)和时间戳的版本号等。znode的读写是原子性的。每个节点存储的数据很有限(MB以内)。

临时节点:只要创建的znode的会话处于活动状态就存在,会话结束时就被删除。

4.提供的功能(使用场景)

  • 统一命名服务:对服务进行统一命名,便于识别(类似于网关)
  • 统一配置管理:对于集群中的各个节点配置信息的统一管理;配置修改后,快速同步到各个节点中。
  • 统一集群管理:实时监控节点状态变化。
  • 软负载均衡:合理分配请求
二、集群 2.1 集群搭建

详见《Zookeeper集群搭建篇》

集群官网参考

选举机制

第一次启动:

  • 集群第一次开启时,每台服务器启动时都会将票投给自己
  • 当集群中由其他服务器启动时,比较他们之间的myid,将票传递给myid大的服务器。此时如果票数半数以上,则该服务器就被选举为Leader
  • 当集群中有Leader之后,再有服务器启动不会再选举

非第一次启动

  • Leader存活:一台follow与leader失去连接后,集群中的leader依然存活,则该follow会一直尝试连接;
  • Leader宕机:由存活的follow重新选举,选举一句为:
    1. 先比较各服务器的EPOCH(该节点的最后一个Leader的代号),大的胜出
    2. 如第一步相同,则比较ZXID(事务ID), 大的胜出
    3. 如第二部相同,则比较SID(服务器ID,也就是myid),大的胜出
2.2 客户端 *** 作

启动客户端

bin/zkCli.sh -server ip:2181

创建节点

Zookeeper的节点类型:

  • 持久:客户端与服务端断开连接后,创建的阶段不会删除
  • 短暂:断开连接后创建的节点会删除

持久或短暂节点各自又有带序号和不带序号的节点。

创建持久节点:

create /country "china"
create /country/zj "zj"

创建持久节点带序号:

create -s /country/zj "zj"

创建临时节点

create -e /country/xxx "xxx"

创建临时节点带序号

create -es /country/xxx "xxx"

查看节点

get /country
get -s /country/zj

修改节点

set /country/xxx "aaa"

删除节点

delete /country/xxx
# 删除节点及其子节点
deleteall /country

监听器

# 当该节点的数据变化的时候就会被监听到,只能监听一次
get -w /country/zj

# 监听节点数据量变化
ls -w /country
2.3 集群写数据

客户端直接访问Leader

  1. 客户端请求Leader;
  2. 由Leader执行写 *** 作;
  3. Leader将数据写到follower中;
  4. 当半数以上节点都写完后,返回客户端确认。

客户端直接访问Follower

  1. 客户端访问Follower;
  2. Follower将请求转发给Leader,Leader执行写 *** 作;
  3. Leader将数据写到Followder中;
  4. 当半数以上节点写完后,Leader通知被访问的Follower,并由Follower返回客户端确认。
三、示例 3.1 服务上下线的监听

实现目标:

  • 在Zookeeper的根目录下有节点/servers;
  • 客户端Client监听"/servers/"下子节的变化,当有节点创建或删除时便会打印;
  • 服务端Server负责在zookeeper中创建节点。

Server:

public class Server1 {
    ZooKeeper zooKeeper;
    // 连接地址
    private String conn = "192.168.72.161:2181,192.168.72.162:2181,192.168.72.163:2181";
    // 超时时间
    private int timeout = 30000;
	// 减计数器
    private static CountDownLatch downLatch= new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        Server1 server = new Server1();
        // 1.获取Zookeeper连接
        server.getConnection();
        // 此步是为了保证连接成功之后再进行 *** 作,否则可能会初出现异常
        downLatch.await();
        // 2.创建节点
        server.createNode(args[0]);
        // 3.创建完成,执行业务逻辑
        System.out.println("节点创建完成:" + args[0]);
        Thread.sleep(Long.MAX_VALUE);
    }
    
    private void createNode(String host) throws Exception {
        // 创建有序临时节点
        zooKeeper.create("/servers/"+host, host.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    
    private void getConnection() throws IOException {
        zooKeeper = new ZooKeeper(conn, timeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 连接成功
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    downLatch.countDown();
                }
            }
        });
    }
}

Client:

public class Client1 {

    private ZooKeeper zooKeeper;
    // 连接地址
    private String conn = "192.168.72.161:2181,192.168.72.162:2181,192.168.72.163:2181";
    // 超时时间
    private int timeout = 30000;
    // 减计数器
    private static CountDownLatch downLatch= new CountDownLatch(1);


    public static void main(String[] args) throws Exception {
        Client1 client = new Client1();
        // 1.获取连接
        client.getConnection();
        // 此步是为了保证连接成功之后再进行 *** 作,否则可能会初出现异常
        downLatch.await();
        // 2.监听
        client.monitor();
        // 3.等待处理
        Thread.sleep(Long.MAX_VALUE);
    }
    
    private void monitor() throws Exception {
        // 获取该目录下的字节点
        List children = zooKeeper.getChildren("/servers", true);
        ArrayList servers = new ArrayList<>();
        if (children.size() == 0) return;
        // 获取子节点的数据
        for (String child : children){
            byte[] data = zooKeeper.getData("/servers/"+child, false, null);
            servers.add(new String(data));
        }
        System.out.println("节点变化:" + servers);
    }
    
    private void getConnection() throws IOException {
        zooKeeper = new ZooKeeper(conn, timeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    // 连接成功
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        downLatch.countDown();
                    }
                    monitor();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
3.2 分布式锁

实现原理:

创建一个locks节点,在其子节点中创建锁,如下:

/
|___locks
	  |___seq-00000001
	  |___seq-00000002
	  |___seq-00000003
	  |___seq-00000004
	  ......
  • 每当一个用户需要 *** 作共享资源的时候,就会先创建一个seq-节点(获取锁),并判断该节点是否是的序号第一个节点;
    • 如果是:则直接返回,标识获取锁成功
    • 如果不是:则监听该节点的前一个节点,只有当前一个节点被删除(前一个用户释放锁),才返回,标识可以获取锁了
  • 当获取所的用户 *** 作完成后,删除该节点以释放锁。

代码:

锁的实现:

public class DistributeLock {

    private  ZooKeeper zooKeeper;
    // 连接地址
    private static String conn = "192.168.72.161:2181,192.168.72.162:2181,192.168.72.163:2181";
    // 超时时间
    private static int timeout = 20000;
    // 用于记录是否连接成功
    private CountDownLatch connLatch= new CountDownLatch(1);
    // 用于记录获取锁的客户端是否执行完成
    private  CountDownLatch waitLatch= new CountDownLatch(1);
    // 前一个节点
    private  String waitPath;
    // 当前节点
    private String currentLock;

    public DistributeLock() throws Exception {
        // 1.获取连接
        zooKeeper = new ZooKeeper(conn, timeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 连接成功则进行后续 *** 作
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    connLatch.countDown();
                }
                // 如果锁节点被删除则释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });
        // 等待连接完成
        connLatch.await();
        // 2.判断锁节点是否存在
        Stat stat = zooKeeper.exists("/locks", false);
        if (stat == null){
            zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    
    public void getLock(){
        try {
            // 在锁节点下创建临时节点
            currentLock = zooKeeper.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 判断当前节点是否是最小序号的节点,如果是则获取到锁,否则监听前一个锁
            List children = zooKeeper.getChildren("/locks", false);
            if (children.size() == 1){
                return;
            }else {
                // 排序
                Collections.sort(children);
                // 获取当前新建节点的位置
                String nodeName = currentLock.substring("/locks/".length());
                int index = children.indexOf(nodeName);
                if (index == -1){
                    System.out.println("数据异常");
                }else if (index == 0){
                    // 当前节点就是第一个
                    return;
                }else {
                    // 监听前一个节点
                    waitPath = "/locks/" + children.get(index-1);
                    zooKeeper.getData(waitPath, true, null);
                    // 等待资源 *** 作完成
                    waitLatch.await();
                    return;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    public void rmLock(){
        // 删除节点
        try {
            zooKeeper.delete(currentLock, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试:

public class LockTest {
    public static void main(String[] args) throws Exception {
        DistributeLock lock1 = new DistributeLock();
        DistributeLock lock2 = new DistributeLock();


        // 创建两个线程竞争
        new Thread(()->{
            try {
                lock1.getLock();
                System.out.println("Thread1获得锁");
                Thread.sleep(5000);
                lock1.rmLock();
                System.out.println("Thread1释放锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "thread1").start();

        new Thread(()->{
            try {
                lock2.getLock();
                System.out.println("Thread2获得锁");
                Thread.sleep(5000);
                lock2.rmLock();
                System.out.println("Thread2释放锁");
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"thread2").start();
    }
}

测试说明:创建两个锁对象分别被两个线程用于获取锁和释放锁。以代码为例假设线程1先获取到锁:

  • 线程1获取到锁,在zookeeper中创建节点:/locks/seq-0000000001;且该节点的序号为1所以是第一个,所以直接获取到锁,即可以 *** 作共享资源;
  • 线程2启动后尝试获取锁,也创建了节点:/locks/seq-0000000002;但是001节点未释放,所以该节点是第二个节点,因此需要监听001节点被删除时才能获取到锁;
  • 5s后,线程1删除节点(释放锁);监听到001节点被删除后,线程2获取到锁;线程2便可以 *** 作共享资源。
  • 结果为:线程1获得锁----->5s后,线程1释放锁------>线程2获得锁------>5s后,线程2释放锁

也可以使用Curator框架更简单的实现:官网地址

四、扩展

Paxos算法

是什么?一种基于消息传递且具有高度容错性的一致性算法。

解决了?如何快速正确的在一个分布式系统中对某个数据达成一致,并且保证不论发生任何异常都不会破坏整个系统的一致性。

ZAB协议

是什么?全称Zookeeper Atomic Broadcast(Zookeeper原子广播)。是专门设计的一种支持崩溃恢复的原子广播协议。

解决了?通过zab协议来保证分布式事务的**最终一致性。**实现Leader与Follower之间的数据一致性。

详细可参考博客:https://www.jianshu.com/p/2bceacd60b8a

CAP理论

Consistency(一致性):在分布式环境下,各个副本之间的数据能否保证一致

Available(可用性):系统是否能一致保证服务可用(在限定的时间内正确的返回结果)

Partition Tolerance(分区容错性):当部分网络分区出现故障的时候,任然能够保证对外提供服务

集群搭建

一、集群规划

这里使用三台集群(最少三台),分别是:

  • hadoop102:192.168.72.161
  • hadoop103:192.168.72.162
  • hadoop104:192.168.72.163

二、集群安装

  • 下载压缩包:https://archive.apache.org/dist/zookeeper/

  • 上传并解压

    tar -zxvf zookeeper-3.5.7-bin-tar.gz  -C 具体目录
    
  • 修改解压目录下的配置文件

    # 数据存储目录
    mkdir zkData
    cd conf
    mv zoo-sample.cfg zoo.cfg
    vim zoo.cfg
    # 修改内容
    dataDir= /......./zkData
    
  • 修改id

    # 在zkData目录下创建文件
    vim myid
    # 输入内容
    1
    
  • 将zookeeper拷贝值其他两台机器

    # 需要配置hadoop103、hadoop104的host地址,否则直接使用ip
    scp -r zookeeper3.5.7 root@hadoop103:/opt/module/zookeeper3.5.7/
    scp -r zookeeper3.5.7 root@hadoop104:/opt/module/zookeeper3.5.7/
    
  • 分别修改node2、node3中的myid为2,3

  • 三台机器的zoo.cfg分别添加:

    server.2=hadoop102:2888:3888
    server.3=hadoop103:2888:3888
    server.4=hadoop104:2888:3888
    
  • 分别启动三台zookeeper

    bin/zkServer.sh start
    # 查看状态
    bin/zkServer.sh status
    # 显示如下:
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper3.5.7/zookeeper3.5.7/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: leader  # 或者是follower
    

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5669286.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存