Zookeeper-分布式锁

Zookeeper-分布式锁,第1张

Zookeeper-分布式

由于在多线程环境下上下文的切换,数据可能出现不一致的情况,我们需要保证数据安全,所以我们能想到的第一个就是加锁,所谓加锁是当一个线程访问某个数据时,进行限制,其他线程不能进行访问,直到该线程读取完,其他线程才可使用。

这里我们以 Redis 秒杀为例:

首先我们把库存预热到了Redis缓存中,库存为1。

A 服务去 Redis 查询库存发现为 1,说明商品存在,我去抢并且准备减 1 ,这时候库存还没有减。

这时 B 服务也去查询库存发现也是 1,这时 B 服务也抢到了,那也减 1。

C 服务同上。

等所有的服务都判断完了,这时发现库存变为了 -2,发生了超卖现象。

从上面的例子可以知道,单个服务去访问 Redis 的时候,因为 Redis 本身单线程的原因,不用考虑线程安全的问题,但是在分布式集群条件下那就出现大问题了,这就需要分布式锁的介入了。

下面我们主要讲解 分布式锁-ZooKeeper。关于 ZooKeeper的介绍可以看我上一篇分享的文章:
ZooKeeper能做什么?

在学习 ZooKeeper 实现分布式锁之前,我们应该需要了解一些 ZooKeeper 的知识,ZooKeeper主要存节点,ZooKeeper 的节点类型有4大类型:

  • 持久化节点:ZooKeeper 断开节点还在
  • 持久化顺序编号目录节点
  • 临时目录节点:客户端断开后节点就删除了
  • 临时目录编号目录节点

这里提示一下:节点的名称都是唯一的

那么如何创建节点?

创建永久节点 :create /test huashao

创建临时节点:create -e /test huashao

创建顺序节点:create -s /test

创建临时顺序节点:create -e -s /test

创建临时节点成功,如果断开这次链接,这个节点自然就消失了
创建临时顺序节点,退出后,重新连接,会发现创建的所有临时节点都没了

这里我们以减少库存为例,模拟并发竞争这样一个情况下会发生什么。

@SpringBootApplication
public class DemoApplication {

    public static class ZKStock {

        private static int inventory = 1;

        public static boolean reduseStock() {
            if (inventory > 0) {
                inventory--;
                return true;
            } else {
                return false;
            }
        }
    }

    static class zkTest  implements Runnable {
        public void run() {
            boolean b = new ZKStock().reduseStock();
            if (b) {
                System.out.println(Thread.currentThread().getName() + ":扣减库存成功!");
            } else {
                System.out.println(Thread.currentThread().getName() + ":扣减库存失败!");
            }
        }
    }

    public static void main(String[] args) {
            new Thread(new zkTest (), "用户张三").start();
            new Thread(new zkTest (), "用户李四").start();
    }
}


这里我定义了一个库存 inventory 值为1,并且 2 个线程一起去扣减库存,发现没有问题,扣除正常,如果这时睡眠 1 秒,看看会出现什么情况:

        public static boolean reduseStock() {
            if (inventory > 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                inventory--;
                return true;
            } else {
                return false;
            }
        }

这时用户发现自己拿到的库存都是 1,两个用户都认为自己抢到了,都做了减一的 *** 作,但是等所有人都执行完,再去 set 值的时候,发现其实已经超卖了。

那怎么解决这个问题?这时我们会想到用同步代码块或者锁机制来解决,但是sync,lock也只能保证当前机器线程安全,这样分布式访问还是有问题。

    private static Lock lock=new ReentrantLock();
    static class zkTest implements Runnable {
        public void run() {
            //上锁
            lock.lock();
            //调用减少库存的方法
            boolean b = new ZKStock().reduseStock();
            //解锁
            lock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName() + ":扣减库存成功!");
            } else {
                System.out.println(Thread.currentThread().getName() + ":扣减库存失败!");
            }
        }
    }

这时候 ZooKeeper 的节点就可以解决这个问题。


ZooKeeper 通过创建临时序列节点来实现分布式锁,适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过 watch 来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……

下面以代码测试一下:

1、首先引用 zk

public class ZooKeeperLock implements Lock {

    //zk客户端
    private ZooKeeper zk;

    //zk是一个目录结构,lock
    private String node = "/lock";

    //锁的名称
    private String lockName;

    //当前线程创建的序列node
    private ThreadLocal nodeId = new ThreadLocal<>();

    //用来同步等待 zkclient 链接到了服务端
    private CountDownLatch connectedSignal = new CountDownLatch(1);

    private final static int sessionTimeout = 3000;

    private final static byte[] data = new byte[0];

    public ZooKeeperLock(String IP_PORT, String lockName) {
        this.lockName = lockName;

        try {
            zk = new ZooKeeper(IP_PORT, sessionTimeout, new Watcher() {

                @Override
                public void process(WatchedEvent event) {
                    // 建立连接
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        connectedSignal.countDown();
                    }
                }
            });

            connectedSignal.await();
            Stat stat = zk.exists(node, false);
            if (null == stat) {
                // 创建根节点
                zk.create(node, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    //获取锁 *** 作
    @Override
    public void lock() {
        try {
            // 创建临时子节点
            String myNode = zk.create(node + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName() + myNode + "created");

            // 取出所有子节点
            List subNodes = zk.getChildren(node, false);
            TreeSet sortedNodes = new TreeSet<>();
            for (String node : subNodes) {
                sortedNodes.add(node + "/" + node);
            }

            String smallNode = sortedNodes.first();

            if (myNode.equals(smallNode)) {
                // 如果是最小的节点,则表示取得锁
                System.out.println(Thread.currentThread().getName() + myNode + "get lock");
                this.nodeId.set(myNode);
                return;
            }

            String preNode = sortedNodes.lower(myNode);

            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。
            // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if (stat != null) {
                System.out.println(Thread.currentThread().getName() + myNode +
                        " waiting for " + node + "/" + preNode + " released lock");

                latch.await();// 等待,这里应该一直等待其他线程释放锁
                nodeId.set(myNode);
                latch = null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    @Override
    public void unlock() {
        try {
            System.out.println(Thread.currentThread().getName() + "unlock ");
            if (null != nodeId) {
                zk.delete(nodeId.get(), -1);
            }
            nodeId.remove();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

3、添加 watch 监听临时顺序节点的删除的类

public class LockWatcher implements Watcher {
    private CountDownLatch latch = null;

    public LockWatcher(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted) {
            latch.countDown();
        }
    }
}

最后测试

@SpringBootApplication
public class DemoApplication {


    private static ZooKeeperLock zkLock;

    static{
        zkLock = new ZooKeeperLock("127.0.0.1:2181","stock_zk");
    }

    public static class ZKStock {

        private static int inventory = 1;

        public static boolean reduseStock() {
            if (inventory > 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                inventory--;
                return true;
            } else {
                return false;
            }
        }
    }

    static class zkTest  implements Runnable {
        public void run() {
            //上锁
            zkLock.lock();

            //调用减少库存的方法
            boolean b = new ZKStock().reduseStock();

            //解锁
            zkLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName() + ":扣减库存成功!");
            } else {
                System.out.println(Thread.currentThread().getName() + ":扣减库存失败!");
            }
        }
    }

    public static void main(String[] args) {
            new Thread(new zkTest (), "用户张三").start();
            new Thread(new zkTest (), "用户李四").start();
    }
}


这里创建了我们指定的节点

那么 ZK 在分布式锁中有哪些缺点呢?

Zk性能上可能并没有缓存服务那么高。

因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。

ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同步到所有的Follower机器上。

使用Zookeeper也有可能带来并发问题。

建议使用 Redis 的分布式锁

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5688086.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存