跨JVM工作的可重入读/写互斥锁,使用Zookeeper来持有锁,所有JVM中使用相同锁路径的所有进程都将实现进程间临界区。这个互斥锁是公平的,每个用户都会按照请求的顺序获得互斥锁(从ZK的角度来看)。
读写锁维护一对关联的锁,一个用于只读 *** 作,一个用于写 *** 作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。
读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。
测试pom.xml:
4.0.0 com.kaven zookeeper1.0-SNAPSHOT 8 8 org.apache.curator curator-recipes5.2.0 org.projectlombok lombok1.18.22
CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息,以及创建Curatorframework实例的方法):
package com.kaven.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.imps.CuratorframeworkState; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorframeworkProperties { // 连接地址 public static final String CONNECT_ADDRESS = "192.168.1.3:9000"; // 连接超时时间 public static final int CONNECTION_TIMEOUT_MS = 40000; // Session超时时间 public static final int SESSION_TIMEOUT_MS = 10000; // 命名空间 public static final String NAMESPACE = "MyNamespace"; // 重试策略 public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3); public static Curatorframework getCuratorframework() { // 创建Curatorframework实例 Curatorframework curator = CuratorframeworkFactory.builder() .connectString(CuratorframeworkProperties.CONNECT_ADDRESS) .retryPolicy(CuratorframeworkProperties.RETRY_POLICY) .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS) .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS) .namespace(CuratorframeworkProperties.NAMESPACE) .build(); curator.start(); assert curator.getState().equals(CuratorframeworkState.STARTED); return curator; } }
InterProcessReadWriteLockRunnable类(实现了Runnable接口,模拟分布式节点获取分布式锁):
package com.kaven.zookeeper; import lombok.SneakyThrows; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import java.nio.charset.StandardCharsets; import java.util.Random; public class InterProcessReadWriteLockRunnable implements Runnable{ @SneakyThrows @Override public void run() { // 使用不同的Curatorframework实例,表示不同的分布式节点 Curatorframework curator = CuratorframeworkProperties.getCuratorframework(); // 分布式锁的路径 String basePath = "/kaven"; // 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能 InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath, "分布式读写锁".getBytes(StandardCharsets.UTF_8)); // 根据随机数来决定获取写锁还是读锁 Random random = new Random(); if(random.nextInt(10000) > 5000) { // 获取写锁 readWriteLock.writeLock().acquire(); System.out.println(Thread.currentThread().getName() + "获取写锁"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "释放写锁"); // 释放写锁 readWriteLock.writeLock().release(); } else { // 获取读锁 readWriteLock.readLock().acquire(); System.out.println(Thread.currentThread().getName() + "获取读锁"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "释放读锁"); // 释放读锁 readWriteLock.readLock().release(); } } }
启动类:
package com.kaven.zookeeper; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Application { private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); public static void main(String[] args) throws Exception { // 分布式节点处理业务 for (int i = 0; i < 15; i++) { EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable()); } } }
模拟15个分布式节点获取分布式锁,输出如下所示:
pool-1-thread-8获取写锁 pool-1-thread-8释放写锁 pool-1-thread-13获取写锁 pool-1-thread-13释放写锁 pool-1-thread-12获取读锁 pool-1-thread-15获取读锁 pool-1-thread-12释放读锁 pool-1-thread-15释放读锁 pool-1-thread-1获取写锁 pool-1-thread-1释放写锁 pool-1-thread-6获取读锁 pool-1-thread-2获取读锁 pool-1-thread-14获取读锁 pool-1-thread-9获取读锁 pool-1-thread-2释放读锁 pool-1-thread-6释放读锁 pool-1-thread-14释放读锁 pool-1-thread-9释放读锁 pool-1-thread-4获取写锁 pool-1-thread-4释放写锁 pool-1-thread-11获取写锁 pool-1-thread-11释放写锁 pool-1-thread-5获取读锁 pool-1-thread-5释放读锁 pool-1-thread-10获取写锁 pool-1-thread-10释放写锁 pool-1-thread-7获取写锁 pool-1-thread-7释放写锁 pool-1-thread-3获取读锁 pool-1-thread-3释放读锁
为了验证输出是否符合预期,可以通过ZooKeeper提供的客户端获取锁路径下的所有节点,如下图所示:
排序后如下所示,和输出是对应的(读锁可以被多个用户同时持有,而写锁是独占的)。
/MyNamespace/kaven/_c_cd949fbc-c779-46e6-a7d8-4c4c90f13f23-__WRIT__0000000000 /MyNamespace/kaven/_c_a4934270-e9c4-40c0-8b89-5fb369a7cfaa-__WRIT__0000000001 /MyNamespace/kaven/_c_5a03751b-7f45-4813-99f6-49671c473367-__READ__0000000002 /MyNamespace/kaven/_c_babe2eec-7259-484f-9f76-51a5fc45d607-__READ__0000000003 /MyNamespace/kaven/_c_405e6cc8-481b-4d9a-8cf1-a83ea69912dd-__WRIT__0000000004 /MyNamespace/kaven/_c_a870b175-c082-4661-9383-4a82a021e283-__READ__0000000005 /MyNamespace/kaven/_c_3fa721b5-1015-46e8-8da3-20b333b76ca4-__READ__0000000006 /MyNamespace/kaven/_c_03e0208c-fb25-4229-9534-4dc6c295d6a8-__READ__0000000007 /MyNamespace/kaven/_c_5428e1af-c9fc-4afd-92f7-76fbbb545dcc-__READ__0000000008 /MyNamespace/kaven/_c_395407f3-c402-40b4-abe2-5750f43ddd9d-__WRIT__0000000009 /MyNamespace/kaven/_c_c7a1a90c-8b76-47fc-8c9e-1dc15c43a8bd-__WRIT__0000000010 /MyNamespace/kaven/_c_800861ea-8da2-4ff5-b237-19d8b939393e-__READ__0000000011 /MyNamespace/kaven/_c_3d357ec6-5288-4719-ba3b-7f98eceb32d6-__WRIT__0000000012 /MyNamespace/kaven/_c_e8067da5-bf08-4876-8200-1746dbc2ce77-__WRIT__0000000013 /MyNamespace/kaven/_c_8d4e07a2-08b9-4c1d-91d4-3b0d76844b33-__READ__0000000014锁降级
public class InterProcessReadWriteLockRunnable implements Runnable{ @SneakyThrows @Override public void run() { // 使用不同的Curatorframework实例,表示不同的分布式节点 Curatorframework curator = CuratorframeworkProperties.getCuratorframework(); // 分布式锁的路径 String basePath = "/kaven"; // 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能 InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath, "分布式读写锁".getBytes(StandardCharsets.UTF_8)); // 获取写锁 readWriteLock.writeLock().acquire(); System.out.println(Thread.currentThread().getName() + "获取写锁"); Thread.sleep(2000); // 锁降级 readWriteLock.readLock().acquire(); System.out.println(Thread.currentThread().getName() + "获取读锁,锁降级成功"); Thread.sleep(2000); // 释放读锁 System.out.println(Thread.currentThread().getName() + "释放读锁"); readWriteLock.readLock().release(); // 释放写锁 System.out.println(Thread.currentThread().getName() + "释放写锁"); readWriteLock.writeLock().release(); } }
public class Application { private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); public static void main(String[] args) throws Exception { // 分布式节点处理业务 for (int i = 0; i < 5; i++) { EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable()); } } }
输出如下所示:
pool-1-thread-3获取写锁 pool-1-thread-3获取读锁,锁降级成功 pool-1-thread-3释放读锁 pool-1-thread-3释放写锁 pool-1-thread-1获取写锁 pool-1-thread-1获取读锁,锁降级成功 pool-1-thread-1释放读锁 pool-1-thread-1释放写锁 pool-1-thread-4获取写锁 pool-1-thread-4获取读锁,锁降级成功 pool-1-thread-4释放读锁 pool-1-thread-4释放写锁 pool-1-thread-5获取写锁 pool-1-thread-5获取读锁,锁降级成功 pool-1-thread-5释放读锁 pool-1-thread-5释放写锁 pool-1-thread-2获取写锁 pool-1-thread-2获取读锁,锁降级成功 pool-1-thread-2释放读锁 pool-1-thread-2释放写锁
InterProcessReadWriteLock类除了构造方法外,就只有readLock和writeLock这两个方法可以调用,而这两个方法的返回值是InterProcessMutex实例,因此分布式读写锁InterProcessReadWriteLock的实现是基于分布式锁InterProcessMutex。
ZooKeeper : Curator框架之分布式锁InterProcessMutex
Curator框架的分布式锁InterProcessReadWriteLock就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)