之前有写过Zookeeper细读经典,今天写写用Curator *** 作ZK。
实际上,使用JAVA *** 作ZK的方式还有JAVA API和ZKClient两种方式,但总体而言,Curator方式最简单,这部分时ZK书中有详细的描述,我这里因为有环境,所以周末复现了一下,主要引出ZK设计最初的功能——分布式锁,下一节我将基于ZK实现一个分布式锁,本节先把Curator *** 作ZK的部分代码写一写。
1、Curator *** 作ZKpackage com.example.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.Test; import java.util.List; public class CuratorTest { private Curatorframework client; @Test public void testConnection() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //两种工厂创建方式 client = CuratorframeworkFactory.newClient("localhost:2181", retryPolicy); Curatorframework client5 = CuratorframeworkFactory.builder() .connectString("121.43.170.118:2181") .namespace("txl") .retryPolicy(retryPolicy) .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000).build(); client.start(); //创建带有节点内容的节点 String path = client.create().forPath("/txl333", "sb".getBytes()); //直接创建多级节点 String path1 = client.create().creatingParentsIfNeeded().forPath("/test/txl333", "sb".getBytes()); //对应命令行get方法 byte[] bytes = client.getData().forPath("/txl333"); String data = new String(bytes); //返回nameSpace下的所有节点,默认‘/’路径 List2、Curator *** 作实现分布式锁strings = client.getChildren().forPath("/"); //查询节点状态czxid.mzxid等信息 Stat status = new Stat(); byte[] bytes1 = client.getData().storingStatIn(status).forPath("/txl333"); //对应set方法 Stat stat1 = client.setData().forPath("/txl333", "txllvwy".getBytes()); //但实际上我们更常用的是带状态修改 //一般先查Stat信息,再根据版本信息修改 Stat status1 = new Stat(); int version = status1.getVersion(); Stat stat = client.setData().withVersion(version).forPath("/txl333", "txlYYlvwy".getBytes()); //version不匹配的话会抛出异常 //对应delete方法 client.delete().forPath("/txl333"); //删除带有子节点的路径 client.delete().deletingChildrenIfNeeded().forPath("/txl333"); //必须成功的删除 client.delete().guaranteed().forPath("/txl333"); //带有回调的删除 client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(Curatorframework curatorframework, CuratorEvent curatorEvent) throws Exception { System.out.println("删除成功"); } }).forPath("txl333"); client.close(); } @Test public void testNodeCacheWatcher() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); client = CuratorframeworkFactory.newClient("121.43.170.118:2181", retryPolicy); //NodeCache节点监听 final NodeCache nodeCache = new NodeCache(client, "/app"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //开启监听,类似channel的sync方法 nodeCache.start(); while (true) { } } @Test public void testPathChildrenCacheWatcher() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); Curatorframework client5 = CuratorframeworkFactory.builder() .connectString("121.43.170.118:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000).build(); client5.start(); final PathChildrenCache childrenCache = new PathChildrenCache(client5, "/", true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); List childDataList = childrenCache.getCurrentData(); System.out.println("当前数据节点的子节点数据列表:"); for (ChildData cd : childDataList) { String childData = new String(cd.getData()); System.out.println(childData); } childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(Curatorframework ient, PathChildrenCacheEvent event) throws Exception { if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { System.out.println("子节点初始化成功"); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { System.out.println("添加子节点路径:" + event.getData().getPath()); System.out.println("子节点数据:" + new String(event.getData().getData())); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { System.out.println("删除子节点:" + event.getData().getPath()); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { System.out.println("修改子节点路径:" + event.getData().getPath()); System.out.println("修改子节点数据:" + new String(event.getData().getData())); } } }); while (true) { } } @Test public void testTreeCache() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); Curatorframework client5 = CuratorframeworkFactory.builder() .connectString("121.43.170.118:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000).build(); client5.start(); TreeCache treeCache = new TreeCache(client5, "/"); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(Curatorframework curatorframework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("节点变更"); System.out.println("节点路径:"+treeCacheEvent.getData().getPath()); System.out.println("节点数据:"+new String(treeCacheEvent.getData().getData())); } }); treeCache.start(); while (true) { } } }
核心思想:利用临时节点的特性,获取锁时创建节点,释放资源时删除节点
步骤:
1、客户端获取锁时,在Lock节点下创建临时顺序节点(用临时节点的原因是让锁能够释放,顺序是实现排队)
2、所有连接客户端获取Lock节点下所有的节点,如果发现自己节点序号最小,则认为自己获取到了锁,开始对临界区资源进行 *** 作
3、如果发现自己并非最小的节点,说明自己并不能获取到锁,那就向序号比自己小的前一个节点注册Watcher事件,
4、如果发现比自己小的节点删除,则当前节点再进行判断,判断自己是否是最小的节点,如果是最小的节点,则获取锁,否则重复以上步骤
目前Curator支持5种锁方案:
InterProcessSemaphoreMutex 不可重入排他锁
InterProcessSemaphoreV2 信号量
InterProcessMutex 可重入排他锁
InterProcessMultiLock 多锁容器
InterProcessReadWriteLock 读写锁
我们先模拟多进程获取临界区资源
public class TestMutex { public static void main(String[] args) { ThreadWithoutMutex threadWithoutMutex = new ThreadWithoutMutex(); Thread thread1 = new Thread(threadWithoutMutex,"用户1"); Thread thread2 = new Thread(threadWithoutMutex,"用户2"); Thread thread3 = new Thread(threadWithoutMutex,"用户3"); thread1.start(); thread2.start(); thread3.start(); } }
public class ThreadWithoutMutex implements Runnable { private int num = 20; @Override public void run() { while (true) { if (num > 0) { System.out.println(Thread.currentThread().getName() + "获取到资源,剩余:" + num); num--; } } } }
在没有锁的情况下,结果是有异常的,也就是常说的超卖
这里我们在进程内添加ZK分布式锁(我前几天遇到有个同学跟我交流,说我为什么要在一个JVM里写分布式锁,这样不是和实际情况不符么,我建议同学先了解清楚进程和线程的概念再来问这种问题。。。。。。。。。。。。。。。)
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.TimeUnit; public class ThreadWithoutMutex implements Runnable { private int num = 20; private InterProcessMutex lock; public ThreadWithoutMutex(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); Curatorframework client5 = CuratorframeworkFactory.builder() .connectString("localhost:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000).build(); client5.start(); lock = new InterProcessMutex(client5,"/lock"); } @Override public void run() { while (true) { try { lock.acquire(1, TimeUnit.SECONDS); if (num > 0) { System.out.println(Thread.currentThread().getName() + "获取到资源,剩余:" + num); num--; } } catch (Exception e) { e.printStackTrace(); }finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
这里的lock节点无需提前创建,你也无须手动创建Watcher,是不是非常方便
输出结果已经有序了,我的debug日志太多,就不展示了
以上,综合而言,ZK实现分布式锁比数据库实现分布式锁的实现成本要低得多,因为ZK天生就是开源的分布式锁,然而维护ZK的高可用,以及选举过程的长时间停用,都是ZK的问题所在,数据库虽然效率低,但是不用额外维护ZK集群,用手头工具就可以解决。当然还有redis分布式锁,这个我们再往下的redis里,再细说。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)