由于在多线程环境下上下文的切换,数据可能出现不一致的情况,我们需要保证数据安全,所以我们能想到的第一个就是加锁,所谓加锁是当一个线程访问某个数据时,进行限制,其他线程不能进行访问,直到该线程读取完,其他线程才可使用。
这里我们以 Redis 秒杀为例:
首先我们把库存预热到了Redis缓存中,库存为1。
A 服务去 Redis 查询库存发现为 1,说明商品存在,我去抢并且准备减 1 ,这时候库存还没有减。
这时 B 服务也去查询库存发现也是 1,这时 B 服务也抢到了,那也减 1。
C 服务同上。
等所有的服务都判断完了,这时发现库存变为了 -2,发生了超卖现象。
从上面的例子可以知道,单个服务去访问 Redis 的时候,因为 Redis 本身单线程的原因,不用考虑线程安全的问题,但是在分布式集群条件下那就出现大问题了,这就需要分布式锁的介入了。
下面我们主要讲解 分布式锁-ZooKeeper。关于 ZooKeeper的介绍可以看我上一篇分享的文章:
ZooKeeper能做什么?
在学习 ZooKeeper 实现分布式锁之前,我们应该需要了解一些 ZooKeeper 的知识,ZooKeeper主要存节点,ZooKeeper 的节点类型有4大类型:
- 持久化节点:ZooKeeper 断开节点还在
- 持久化顺序编号目录节点
- 临时目录节点:客户端断开后节点就删除了
- 临时目录编号目录节点
这里提示一下:节点的名称都是唯一的
那么如何创建节点?
创建永久节点 :create /test huashao
创建临时节点:create -e /test huashao
创建顺序节点:create -s /test
创建临时顺序节点:create -e -s /test
创建临时节点成功,如果断开这次链接,这个节点自然就消失了
创建临时顺序节点,退出后,重新连接,会发现创建的所有临时节点都没了
这里我们以减少库存为例,模拟并发竞争这样一个情况下会发生什么。
@SpringBootApplication public class DemoApplication { public static class ZKStock { private static int inventory = 1; public static boolean reduseStock() { if (inventory > 0) { inventory--; return true; } else { return false; } } } static class zkTest implements Runnable { public void run() { boolean b = new ZKStock().reduseStock(); if (b) { System.out.println(Thread.currentThread().getName() + ":扣减库存成功!"); } else { System.out.println(Thread.currentThread().getName() + ":扣减库存失败!"); } } } public static void main(String[] args) { new Thread(new zkTest (), "用户张三").start(); new Thread(new zkTest (), "用户李四").start(); } }
这里我定义了一个库存 inventory 值为1,并且 2 个线程一起去扣减库存,发现没有问题,扣除正常,如果这时睡眠 1 秒,看看会出现什么情况:
public static boolean reduseStock() { if (inventory > 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } inventory--; return true; } else { return false; } }
这时用户发现自己拿到的库存都是 1,两个用户都认为自己抢到了,都做了减一的 *** 作,但是等所有人都执行完,再去 set 值的时候,发现其实已经超卖了。
那怎么解决这个问题?这时我们会想到用同步代码块或者锁机制来解决,但是sync,lock也只能保证当前机器线程安全,这样分布式访问还是有问题。
private static Lock lock=new ReentrantLock(); static class zkTest implements Runnable { public void run() { //上锁 lock.lock(); //调用减少库存的方法 boolean b = new ZKStock().reduseStock(); //解锁 lock.unlock(); if (b) { System.out.println(Thread.currentThread().getName() + ":扣减库存成功!"); } else { System.out.println(Thread.currentThread().getName() + ":扣减库存失败!"); } } }
这时候 ZooKeeper 的节点就可以解决这个问题。
ZooKeeper 通过创建临时序列节点来实现分布式锁,适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过 watch 来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……
下面以代码测试一下:
1、首先引用 zk
public class ZooKeeperLock implements Lock { //zk客户端 private ZooKeeper zk; //zk是一个目录结构,lock private String node = "/lock"; //锁的名称 private String lockName; //当前线程创建的序列node private ThreadLocalnodeId = new ThreadLocal<>(); //用来同步等待 zkclient 链接到了服务端 private CountDownLatch connectedSignal = new CountDownLatch(1); private final static int sessionTimeout = 3000; private final static byte[] data = new byte[0]; public ZooKeeperLock(String IP_PORT, String lockName) { this.lockName = lockName; try { zk = new ZooKeeper(IP_PORT, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 建立连接 if (event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); Stat stat = zk.exists(node, false); if (null == stat) { // 创建根节点 zk.create(node, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { throw new RuntimeException(e); } } //获取锁 *** 作 @Override public void lock() { try { // 创建临时子节点 String myNode = zk.create(node + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + myNode + "created"); // 取出所有子节点 List subNodes = zk.getChildren(node, false); TreeSet sortedNodes = new TreeSet<>(); for (String node : subNodes) { sortedNodes.add(node + "/" + node); } String smallNode = sortedNodes.first(); if (myNode.equals(smallNode)) { // 如果是最小的节点,则表示取得锁 System.out.println(Thread.currentThread().getName() + myNode + "get lock"); this.nodeId.set(myNode); return; } String preNode = sortedNodes.lower(myNode); CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。 // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if (stat != null) { System.out.println(Thread.currentThread().getName() + myNode + " waiting for " + node + "/" + preNode + " released lock"); latch.await();// 等待,这里应该一直等待其他线程释放锁 nodeId.set(myNode); latch = null; } } catch (Exception e) { throw new RuntimeException(e); } } @Override public void unlock() { try { System.out.println(Thread.currentThread().getName() + "unlock "); if (null != nodeId) { zk.delete(nodeId.get(), -1); } nodeId.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
3、添加 watch 监听临时顺序节点的删除的类
public class LockWatcher implements Watcher { private CountDownLatch latch = null; public LockWatcher(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { latch.countDown(); } } }
最后测试
@SpringBootApplication public class DemoApplication { private static ZooKeeperLock zkLock; static{ zkLock = new ZooKeeperLock("127.0.0.1:2181","stock_zk"); } public static class ZKStock { private static int inventory = 1; public static boolean reduseStock() { if (inventory > 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } inventory--; return true; } else { return false; } } } static class zkTest implements Runnable { public void run() { //上锁 zkLock.lock(); //调用减少库存的方法 boolean b = new ZKStock().reduseStock(); //解锁 zkLock.unlock(); if (b) { System.out.println(Thread.currentThread().getName() + ":扣减库存成功!"); } else { System.out.println(Thread.currentThread().getName() + ":扣减库存失败!"); } } } public static void main(String[] args) { new Thread(new zkTest (), "用户张三").start(); new Thread(new zkTest (), "用户李四").start(); } }
这里创建了我们指定的节点
那么 ZK 在分布式锁中有哪些缺点呢?
Zk性能上可能并没有缓存服务那么高。
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。
ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同步到所有的Follower机器上。
使用Zookeeper也有可能带来并发问题。
建议使用 Redis 的分布式锁
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)