最经典的分布式锁是可重入的公平锁,什么是可重入的公平锁呢,直接讲的话可能会优点枯燥,那么明怀我现在呢就给大家举一个例子,那就是水井取水的例子,如果首先是牌号,按照号的顺序,从小到大进行的打水
什么是可重入锁呢,就是说,你现在取水是按照对应的家庭来进行取水的,假如说你的爸爸取到了水,那么这个时候,你也可以去取水,而不用去取号排队
Zookeeper的临时顺序节点,天生就有一副实现分布式锁的胚子。为什么呢?
1.Zookeeper的每个节点都是一个天然的顺序发号器
在每个节点下面创建临时顺序节点类型,新的子节点会加上一个顺序编号,其实它就像是我们学习过的数据结构,树
2.Zookeeper节点的递增有序性可以确保锁的公平
一个Zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点,然后每个想要获取锁的线程都可以在该节点下面创建一个临时顺序节点。由于Zk节点是按照创建的顺序依次递增的,为了确保公平,可以简单地规定,编号最小的那个节点表示获得了锁
1.3Zookeeper的节点监听机制可以保障占有锁的传递有序而且高效
接下来需要做的就是每一个节点只需要监听紧挨着自己的前一个节点,一旦前一个节点被删除了,就再进行依次判断,看看自己的序号是不是最小的,如果是的话,那么就可以获得锁。
其内部的优势就是一旦因为网络的原因与Zookeeper集群服务器失去联系的话,那么其临时ZNode也会被自动删除。
1.4Zookeeper的节点监听机制能够有效避免羊群效应
Zookeeper这种收尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是一个节点挂掉了,其他节点都去监听,然后做出反应,这样会给服务器带来巨大的压力。
接下来就基于Zookeeper实现以下分布式锁
首先,定义了一个锁的接口Lock,很简单,仅仅两个抽象方法:一个加锁方法,一个解锁方法。Lock接口的代码如下:
 package com.crazymakercircle.zk.distributedLock;  /** * 锁的接口 * create by尼恩 @ 疯狂创客圈 **/ public interface Lock {  /** * 加锁方法 * * @return是否成功加锁 */ booleanlock();  /** * 解锁方法 * * @return是否成功解锁 */ booleanunlock(); }
使用ZooKeeper实现分布式锁的算法,流程大致如下:
(1)一把锁,使用一个ZNode节点表示,如果锁对应的ZNode节点不存在,那么先创建ZNode节点。这里假设为“/test/lock”,代表了一把需要创建的分布式锁。
(2)抢占锁的所有客户端,使用锁的ZNode节点的子节点列表来表示;如果某个客户端需要占用锁,则在“/test/lock”下创建一个临时有序的子节点。
这里,所有子节点尽量共用一个有意义的子节点前缀。
如果子节点前缀为“/test/lock/seq-”,则第一个客户端对应的子节点为“/test/lock/seq-000000000”,第二个为“/test/lock/seq-000000001”,以此类推。
如果子节点前缀为“/test/lock/”,则第一个客户端对应的子节点为“/test/lock/000000000”,第二个为“/test/lock/000000001”,以此类推,也非常直观。
(3)如果判定客户端是否占有锁呢?很简单,客户端创建子节点后,需要进行判断:自己创建的子节点是否为当前子节点列表中序号最小的子节点。如果是,则认为加锁成功;如果不是,则监听前一个ZNode子节点的变更消息,等待前一个节点释放锁。
(4)一旦队列中后面的节点获得前一个子节点的变更通知,则开始进行判断,判断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;如果不是,则持续监听,一直到获得锁。
(5)获取锁后,开始处理业务流程。在完成业务流程后,删除自己对应的子节点,完成释放锁的工作,以便后面的节点能捕获到节点的变更通知,获得分布式锁。
加锁的方法是lock(),lock()加锁的方式就是先去尝试着加锁,如果加锁失败就去等待,然后再重复,代码如下:
package com.crazymakercircle.zk.distributedLock;  import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.Curatorframework; import org.apache.ZooKeeper.WatchedEvent; import org.apache.ZooKeeper.Watcher;  import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;  /** * 分布式锁的实现 * create by尼恩 @ 疯狂创客圈 **/ @Slf4j public class ZkLock implements Lock { //ZkLock的节点链接 private static final String ZK_PATH = "/test/lock"; private static final String LOCK_PREFIX = ZK_PATH + "/"; private static final long WAIT_TIME = 1000; //Zk客户端 Curatorframework client = null;  private String locked_short_path = null; private String locked_path = null; private String prior_path = null; final AtomicIntegerlockCount = new AtomicInteger(0); private Thread thread;  public ZkLock() { ZKclient.instance.init(); if (! ZKclient.instance.isNodeExist(ZK_PATH)) { ZKclient.instance.createNode(ZK_PATH, null); } client = ZKclient.instance.getClient(); }  /** * 加锁的实现 * * @return是否加锁成功 */ @Override public booleanlock() {  //可重入,确保同一线程可以重复加锁 synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (! thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } }  try { boolean locked = false;  //首先尝试着去加锁 locked = tryLock();  if (locked) { return true; }  //如果加锁失败就去等待 while (! locked) {  //等待 await();  // 获取等待的子节点列表 Listwaiters = getWaiters();  //判断,是否加锁成功 if (checkLocked(waiters)) { locked = true; } } return true; } catch (Exception e) { e.printStackTrace(); unlock(); } return false; } //...省略其他的方法 }
尝试加锁的tryLock方法是关键,它做了两件重要的事情:
(1)创建临时顺序节点,并且保存自己的节点路径。
(2)判断是否是第一个,如果是第一个,则加锁成功。如果不是,就找到前一个ZNode节点,并且把它的路径保存到prior_path。
tryLock()方法的代码如下:
```java  package com.crazymakercircle.zk.distributedLock; //..... 省略import /** * 分布式锁的实现 * create by尼恩 @ 疯狂创客圈 **/ @Slf4j public class ZkLock implements Lock {  /** * 尝试加锁 * * @return是否加锁成功 * @throws Exception异常 */ private booleantryLock() throws Exception { //创建临时ZNode节点 locked_path = ZKclient.instance .createEphemeralSeqNode(LOCK_PREFIX); if (null == locked_path) { throw new Exception("zk error"); }  //取得加锁的排队编号 locked_short_path = getShorPath(locked_path);  //获取加锁的队列 Listwaiters = getWaiters();  //获取等待的子节点列表,判断自己是否第一个 if (checkLocked(waiters)) { return true; }  // 判断自己排第几个 int index = Collections.binarySearch(waiters, locked_short_path); if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了 throw new Exception("节点没有找到: " + locked_short_path); }  //如果自己没有获得锁 //保存前一个节点,稍候会监听前一个节点 prior_path = ZK_PATH + "/" + waiters.get(index -1); return false; }  //...省略其他的方法 }
创建临时顺序节点后,它的完整路径存放在locked_path成员中。另外,还截取了一个后缀路径,放在locked_short_path成员中,后缀路径是一个短路径,只有完整路径的最后一层。为什么要单独保存短路径呢?因为获取的远程子节点列表中的其他路径都是短路径,只有最后一层。为了方便进行比较,也把自己的短路径保存下来。
创建了自己的临时节点后,调用checkLocked方法,判断是否锁定成功。如果锁定成功,则返回true;如果自己没有获得锁,则要监听前一个节点。找出前一个节点的路径,保存在prior_path成员中,供后面的await()等待方法用于监听。在介绍await()等待方法之前,先说下checkLocked锁定判断方法。
在checkLocked()方法中,判断是否可以持有锁。判断规则很简单:当前创建的节点是否在上一步获取到的子节点列表的第一个位置:
· 如果是,说明可以持有锁,返回true,表示加锁成功。
· 如果不是,说明有其他线程早已先持有了锁,返回false。
checkLocked()方法的代码如下:  package com.crazymakercircle.zk.distributedLock; //..... 省略import /** * 分布式锁的实现 * create by尼恩 @ 疯狂创客圈 **/ @Slf4j public class ZkLock implements Lock {  /** * 判断是否加锁成功 * @param waiters排队列表 * @return成功状态 */ private booleancheckLocked(Listwaiters) {  //节点按照编号,升序排列 Collections.sort(waiters);  // 如果是第一个,代表自己已经获得了锁 if (locked_short_path.equals(waiters.get(0))) { log.info("成功地获取分布式锁,节点为{}", locked_short_path); return true; } return false; }  //...省略其他的方法 }
checkLocked方法比较简单,将参与排队的所有子节点列表从小到大根据节点名称进行排序。排序主要依靠节点的编号,也就是后10位数字,因为前缀都是一样的。排序之后,进行判断,如果自己的locked_short_path编号位置排在第一个,代表自己已经获得了锁。如果不是,则会返回false。
如果checkLocked()为false,那么外层的调用方法一般会执行等待方法await()执行争夺锁失败以后的等待逻辑。
await()也很简单,就是监听前一个ZNode节点prior_path成员的删除事件,代码如下:
 package com.crazymakercircle.zk.distributedLock;  //..... 省略import  /** * 分布式锁的实现 * create by尼恩 @ 疯狂创客圈 **/ @Slf4j public class ZkLock implements Lock {  /** * 等待 * 监听前一个节点的删除事件 * @throws Exception可能会有Zk异常、网络异常 */ private void await() throws Exception {  if (null == prior_path) { throw new Exception("prior_path error"); }  final CountDownLatch latch = new CountDownLatch(1);  //监听方式一: Watcher一次性订阅 //订阅比自己编号小的顺序节点的删除事件 Watcher w = new Watcher() { @Override public void process(WatchedEventwatchedEvent) { System.out.println("监听到的变化watchedEvent = " + watchedEvent); log.info("[WatchedEvent]节点删除"); latch.countDown(); } };  //开始监听 client.getData().usingWatcher(w).forPath(prior_path);  //监听方式二:TreeCache订阅 //订阅比自己编号小的顺序节点的删除事件 /* TreeCachetreeCache = new TreeCache(client, prior_path); TreeCacheListener l = new TreeCacheListener() { @Override public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data ! = null) { switch (event.getType()) { case NODE_REMOVED: log.debug("[TreeCache]节点删除,path={}, data={}", data.getPath(), data.getData()); latch.countDown(); break; default: break; } } } }; //开始监听 treeCache.getListenable().addListener(l); treeCache.start(); */ //限时等待,最长加锁时间为3s latch.await(WAIT_TIME, TimeUnit.SECONDS); }  //...省略其他的方法 }
首先添加一个Watcher监听,而监听的节点正是前面所保存在prior_path成员的前一个节点的路径。这里,仅仅去监听自己前一个节点的变动,而不是其他节点的变动,以提升效率。完成监听之后,调用latch.await(),线程进入等待状态,一直到线程被监听回调代码中的latch.countDown()所唤醒,或者等待超时。
在上面的代码中,监听前一个节点的删除可以使用两种监听方式:
· Watcher订阅。
· TreeCache订阅。
两种方式的效果都差不多。但是,这里的删除事件只需要监听一次即可,不需要反复监听,所以,建议使用Watcher一次性订阅。而程序中有关TreeCache订阅的代码已经被注释,供大家参考。一旦前一个节点prior_path被删除,那么就将线程从等待状态唤醒,重新开始一轮锁的争夺,直到获取锁,再完成业务处理。
至此,关于lock加锁的算法还差一点点就介绍完成。这一点,就是实现锁的可重入。什么是可重入呢?只需要保障同一个线程进入加锁的代码,可以重复加锁成功即可。修改前面的lock方法,在前面加上可重入的判断逻辑。代码如下:
 @Override public booleanlock() { //可重入的判断 synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (! thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } } //.... }
为了变成可重入,在代码中增加了一个加锁的计数器lockCount,计算重复加锁的次数。如果是同一个线程加锁,只需要增加次数,直接返回,表示加锁成功。至此,lock()方法已经介绍完了,接下来,就是去释放锁
10.6.5 释放锁的实现
Lock接口中的unLock()方法用于释放锁,释放锁主要有两个工作:
· 减少重入锁的计数,如果不是0,直接返回,表示成功地释放了一次。
· 如果计数器为0,移除Watchers监听器,并且删除创建的ZNode临时节点。
unLock()方法的代码如下:
 package com.crazymakercircle.zk.distributedLock;  //..... 省略import  /** * 分布式锁的实现 * create by尼恩 @ 疯狂创客圈 **/  @Slf4j public class ZkLock implements Lock {  /** * 释放锁 * * @return是否成功释放锁 */ @Override public booleanunlock() {  //只有加锁的线程能够解锁 if (! thread.equals(Thread.currentThread())) { return false; }  //减少可重入的计数 int newLockCount = lockCount.decrementAndGet();  //计数不能小于0 if (newLockCount< 0) { throw new IllegalMonitorStateException("计数不对: " + locked_path); }  //如果计数不为0,直接返回 if (newLockCount ! = 0) { return true; } try { //删除临时节点 if (ZKclient.instance.isNodeExist(locked_path)) { client.delete().forPath(locked_path); } } catch (Exception e) { e.printStackTrace(); return false; } return true; }  //...省略其他的方法 }
在程序中,为了尽量保证线程的安全,可重入计数器的类型不是int类型,而是Java并发包中的原子类型——AtomicInteger。
10.6.6 分布式锁的使用
编写一个用例,测试一下ZLock的使用,代码如下:
package com.crazymakercircle.zk.distributedLock;  import com.crazymakercircle.cocurrent.FutureTaskScheduler; import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.junit.Test;  /** * 测试分布式锁 * create by尼恩 @ 疯狂创客圈 **/  @Slf4j public class ZkLockTester { //需要锁来保护的公共资源 //变量 int count = 0;  /** * 测试自定义分布式锁 * * @throws InterruptedException异常 */ @Test public void testLock() throws InterruptedException { //10个并发任务 for (int i = 0; i< 10; i++) { FutureTaskScheduler.add(() -> { //创建锁 ZkLock lock = new ZkLock(); lock.lock(); //每条线程执行10次累加 for (int j = 0; j < 10; j++) { //公共的资源变量累加 count++; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("count = " + count); //释放锁 lock.unlock(); }); } Thread.sleep(Integer.MAX_VALUE); }  }1.7 Curator的InterProcessMutex可重入锁
Zlock实现的主要价值是展示一下分布式锁的原理和基础开发。在实际的开发中,如果需要使用到分布式锁,不建议去自己“重复造轮子”,而建议直接使用Curator客户端中的各种官方实现的分布式锁,例如其中的InterProcessMutex可重入锁。
这里提供一个InterProcessMutex可重入锁的使用实例,代码如下:
 package com.crazymakercircle.zk.distributedLock;  //...省略import  /** * 测试分布式锁 * create by尼恩 @ 疯狂创客圈 **/  @Slf4j public class ZkLockTester {  //需要锁来保护的公共资源 //变量 int count = 0;  /** * 测试ZK自带的互斥锁 * * @throws InterruptedException异常 */ @Test public void testzkMutex() throws InterruptedException {  Curatorframework client = ZKclient.instance.getClient(); //创建互斥锁 final InterProcessMutexzkMutex = new InterProcessMutex(client, "/mutex"); //每条线程执行10次累加 for (int i = 0; i< 10; i++) { FutureTaskScheduler.add(() -> { try { //获取互斥锁 zkMutex.acquire(); for (int j = 0; j < 10; j++) { //公共的资源变量累加 count++; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("count = " + count); //释放互斥锁 zkMutex.release(); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(Integer.MAX_VALUE); } }
最后,总结一下ZooKeeper分布式锁:
(1)优点:ZooKeeper分布式锁(如InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单。
(2)缺点:ZooKeeper实现的分布式锁,性能并不太高。为什么呢?因为每次在创建锁和释放锁的过程中,都要动态创建、销毁暂时节点来实现锁功能。大家知道,Zk中创建和删除节点只能通过Leader(主)服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower(从)服务器上,这样频繁的网络通信,性能的短板是非常突出的。
总之,在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁。
目前分布式锁,比较成熟、主流的方案有两种:
(1)基于Redis的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
(2)基于ZooKeeper的分布式锁。适用于高可靠(高可用),而并发量不是太高的场景。
所以,这里没有谁好谁坏的问题,而是谁更合适的问题。
好啦,这一期的分布式锁的原理呢就讲完了,但其实相较于Redis来说的话,Zookeeper确实很不适合来作为处理一些高并发的事件,而是适合高可用的事件,所以兄弟们在选择的时候还是应该谨慎选择,好啦,这一期的文章就到这里啦,我们下期见!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)