细读经典——ZK的补充:ZK的Java *** 作及分布式锁的实现

细读经典——ZK的补充:ZK的Java *** 作及分布式锁的实现,第1张

细读经典——ZK的补充:ZK的Java *** 作及分布式锁的实现

之前有写过Zookeeper细读经典,今天写写用Curator *** 作ZK。

实际上,使用JAVA *** 作ZK的方式还有JAVA API和ZKClient两种方式,但总体而言,Curator方式最简单,这部分时ZK书中有详细的描述,我这里因为有环境,所以周末复现了一下,主要引出ZK设计最初的功能——分布式锁,下一节我将基于ZK实现一个分布式锁,本节先把Curator *** 作ZK的部分代码写一写。

1、Curator *** 作ZK
package com.example.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.Test;

import java.util.List;

public class CuratorTest {

    private Curatorframework client;

    @Test
    public void testConnection() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //两种工厂创建方式
        client = CuratorframeworkFactory.newClient("localhost:2181", retryPolicy);
        Curatorframework client5 = CuratorframeworkFactory.builder()
                .connectString("121.43.170.118:2181")
                .namespace("txl")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000).build();
        client.start();
        
        //创建带有节点内容的节点
        String path = client.create().forPath("/txl333", "sb".getBytes());
        //直接创建多级节点
        String path1 = client.create().creatingParentsIfNeeded().forPath("/test/txl333", "sb".getBytes());
        
        //对应命令行get方法
        byte[] bytes = client.getData().forPath("/txl333");
        String data = new String(bytes);
        //返回nameSpace下的所有节点,默认‘/’路径
        List strings = client.getChildren().forPath("/");
        //查询节点状态czxid.mzxid等信息
        Stat status = new Stat();
        byte[] bytes1 = client.getData().storingStatIn(status).forPath("/txl333");
        
        //对应set方法
        Stat stat1 = client.setData().forPath("/txl333", "txllvwy".getBytes());
        //但实际上我们更常用的是带状态修改
        //一般先查Stat信息,再根据版本信息修改
        Stat status1 = new Stat();
        int version = status1.getVersion();
        Stat stat = client.setData().withVersion(version).forPath("/txl333", "txlYYlvwy".getBytes());
        //version不匹配的话会抛出异常
        
        //对应delete方法
        client.delete().forPath("/txl333");
        //删除带有子节点的路径
        client.delete().deletingChildrenIfNeeded().forPath("/txl333");
        //必须成功的删除
        client.delete().guaranteed().forPath("/txl333");
        //带有回调的删除
        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(Curatorframework curatorframework, CuratorEvent curatorEvent) throws Exception {
                System.out.println("删除成功");
            }
        }).forPath("txl333");

        client.close();
    }

    @Test
    public void testNodeCacheWatcher() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client = CuratorframeworkFactory.newClient("121.43.170.118:2181", retryPolicy);
        //NodeCache节点监听
        final NodeCache nodeCache = new NodeCache(client, "/app");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //开启监听,类似channel的sync方法
        nodeCache.start();
        while (true) {

        }
    }

    
    @Test
    public void testPathChildrenCacheWatcher() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        Curatorframework client5 = CuratorframeworkFactory.builder()
                .connectString("121.43.170.118:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000).build();
        client5.start();
        final PathChildrenCache childrenCache = new PathChildrenCache(client5, "/", true);

        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        List childDataList = childrenCache.getCurrentData();
        System.out.println("当前数据节点的子节点数据列表:");
        for (ChildData cd : childDataList) {
            String childData = new String(cd.getData());
            System.out.println(childData);
        }

        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(Curatorframework ient, PathChildrenCacheEvent event) throws Exception {
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                    System.out.println("子节点初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    System.out.println("添加子节点路径:" + event.getData().getPath());
                    System.out.println("子节点数据:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    System.out.println("删除子节点:" + event.getData().getPath());
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    System.out.println("修改子节点路径:" + event.getData().getPath());
                    System.out.println("修改子节点数据:" + new String(event.getData().getData()));
                }
            }
        });
        while (true) {

        }
    }

    
    @Test
    public void testTreeCache() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        Curatorframework client5 = CuratorframeworkFactory.builder()
                .connectString("121.43.170.118:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000).build();
        client5.start();

        TreeCache treeCache = new TreeCache(client5, "/");
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(Curatorframework curatorframework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("节点变更");
                System.out.println("节点路径:"+treeCacheEvent.getData().getPath());
                System.out.println("节点数据:"+new String(treeCacheEvent.getData().getData()));
            }
        });
        treeCache.start();
        while (true) {

        }
    }
}
2、Curator *** 作实现分布式锁

核心思想:利用临时节点的特性,获取锁时创建节点,释放资源时删除节点

步骤:

1、客户端获取锁时,在Lock节点下创建临时顺序节点(用临时节点的原因是让锁能够释放,顺序是实现排队)

2、所有连接客户端获取Lock节点下所有的节点,如果发现自己节点序号最小,则认为自己获取到了锁,开始对临界区资源进行 *** 作

3、如果发现自己并非最小的节点,说明自己并不能获取到锁,那就向序号比自己小的前一个节点注册Watcher事件,

4、如果发现比自己小的节点删除,则当前节点再进行判断,判断自己是否是最小的节点,如果是最小的节点,则获取锁,否则重复以上步骤

目前Curator支持5种锁方案:

InterProcessSemaphoreMutex 不可重入排他锁

InterProcessSemaphoreV2 信号量

InterProcessMutex 可重入排他锁

InterProcessMultiLock 多锁容器

InterProcessReadWriteLock 读写锁

我们先模拟多进程获取临界区资源

public class TestMutex {
    public static void main(String[] args) {
        ThreadWithoutMutex threadWithoutMutex = new ThreadWithoutMutex();
        Thread thread1 = new Thread(threadWithoutMutex,"用户1");
        Thread thread2 = new Thread(threadWithoutMutex,"用户2");
        Thread thread3 = new Thread(threadWithoutMutex,"用户3");

        thread1.start();
        thread2.start();
        thread3.start();
    }
}
public class ThreadWithoutMutex implements Runnable {
    private int num = 20;

    @Override
    public void run() {
        while (true) {
            if (num > 0) {
                System.out.println(Thread.currentThread().getName() + "获取到资源,剩余:" + num);
                num--;
            }
        }
    }
}

在没有锁的情况下,结果是有异常的,也就是常说的超卖

这里我们在进程内添加ZK分布式锁(我前几天遇到有个同学跟我交流,说我为什么要在一个JVM里写分布式锁,这样不是和实际情况不符么,我建议同学先了解清楚进程和线程的概念再来问这种问题。。。。。。。。。。。。。。。)

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class ThreadWithoutMutex implements Runnable {
    private int num = 20;

    private InterProcessMutex lock;

    public ThreadWithoutMutex(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        Curatorframework client5 = CuratorframeworkFactory.builder()
                .connectString("localhost:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000).build();
        client5.start();
        lock = new InterProcessMutex(client5,"/lock");
    }
    @Override
    public void run() {
        while (true) {
            try {
                lock.acquire(1, TimeUnit.SECONDS);
                if (num > 0) {
                    System.out.println(Thread.currentThread().getName() + "获取到资源,剩余:" + num);
                    num--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

这里的lock节点无需提前创建,你也无须手动创建Watcher,是不是非常方便

输出结果已经有序了,我的debug日志太多,就不展示了

以上,综合而言,ZK实现分布式锁比数据库实现分布式锁的实现成本要低得多,因为ZK天生就是开源的分布式锁,然而维护ZK的高可用,以及选举过程的长时间停用,都是ZK的问题所在,数据库虽然效率低,但是不用额外维护ZK集群,用手头工具就可以解决。当然还有redis分布式锁,这个我们再往下的redis里,再细说。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678092.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存