ZooKeeper : Curator框架之分布式锁InterProcessReadWriteLock

ZooKeeper : Curator框架之分布式锁InterProcessReadWriteLock,第1张

ZooKeeper : Curator框架之分布式锁InterProcessReadWriteLock InterProcessReadWriteLock

跨JVM工作的可重入读/写互斥锁,使用Zookeeper来持有锁,所有JVM中使用相同锁路径的所有进程都将实现进程间临界区。这个互斥锁是公平的,每个用户都会按照请求的顺序获得互斥锁(从ZK的角度来看)。

读写锁维护一对关联的锁,一个用于只读 *** 作,一个用于写 *** 作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

测试

pom.xml:



    4.0.0

    com.kaven
    zookeeper
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.curator
            curator-recipes
            5.2.0
        
        
            org.projectlombok
            lombok
            1.18.22
        
    

CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息,以及创建Curatorframework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorframeworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static Curatorframework getCuratorframework() {
        // 创建Curatorframework实例
        Curatorframework curator = CuratorframeworkFactory.builder()
                .connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorframeworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);
        return curator;
    }
}

InterProcessReadWriteLockRunnable类(实现了Runnable接口,模拟分布式节点获取分布式锁):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;

import java.nio.charset.StandardCharsets;
import java.util.Random;

public class InterProcessReadWriteLockRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式节点
        Curatorframework curator = CuratorframeworkProperties.getCuratorframework();

        // 分布式锁的路径
        String basePath = "/kaven";

        // 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath,
                "分布式读写锁".getBytes(StandardCharsets.UTF_8));

        // 根据随机数来决定获取写锁还是读锁
        Random random = new Random();
        if(random.nextInt(10000) > 5000) {
            // 获取写锁
            readWriteLock.writeLock().acquire();
            System.out.println(Thread.currentThread().getName() + "获取写锁");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "释放写锁");
            // 释放写锁
            readWriteLock.writeLock().release();
        }
        else {
            // 获取读锁
            readWriteLock.readLock().acquire();
            System.out.println(Thread.currentThread().getName() + "获取读锁");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "释放读锁");
            // 释放读锁
            readWriteLock.readLock().release();
        }
    }
}

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 15; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
        }
    }
}

模拟15个分布式节点获取分布式锁,输出如下所示:

pool-1-thread-8获取写锁
pool-1-thread-8释放写锁
pool-1-thread-13获取写锁
pool-1-thread-13释放写锁
pool-1-thread-12获取读锁
pool-1-thread-15获取读锁
pool-1-thread-12释放读锁
pool-1-thread-15释放读锁
pool-1-thread-1获取写锁
pool-1-thread-1释放写锁
pool-1-thread-6获取读锁
pool-1-thread-2获取读锁
pool-1-thread-14获取读锁
pool-1-thread-9获取读锁
pool-1-thread-2释放读锁
pool-1-thread-6释放读锁
pool-1-thread-14释放读锁
pool-1-thread-9释放读锁
pool-1-thread-4获取写锁
pool-1-thread-4释放写锁
pool-1-thread-11获取写锁
pool-1-thread-11释放写锁
pool-1-thread-5获取读锁
pool-1-thread-5释放读锁
pool-1-thread-10获取写锁
pool-1-thread-10释放写锁
pool-1-thread-7获取写锁
pool-1-thread-7释放写锁
pool-1-thread-3获取读锁
pool-1-thread-3释放读锁

为了验证输出是否符合预期,可以通过ZooKeeper提供的客户端获取锁路径下的所有节点,如下图所示:

排序后如下所示,和输出是对应的(读锁可以被多个用户同时持有,而写锁是独占的)。

/MyNamespace/kaven/_c_cd949fbc-c779-46e6-a7d8-4c4c90f13f23-__WRIT__0000000000
/MyNamespace/kaven/_c_a4934270-e9c4-40c0-8b89-5fb369a7cfaa-__WRIT__0000000001
/MyNamespace/kaven/_c_5a03751b-7f45-4813-99f6-49671c473367-__READ__0000000002
/MyNamespace/kaven/_c_babe2eec-7259-484f-9f76-51a5fc45d607-__READ__0000000003
/MyNamespace/kaven/_c_405e6cc8-481b-4d9a-8cf1-a83ea69912dd-__WRIT__0000000004
/MyNamespace/kaven/_c_a870b175-c082-4661-9383-4a82a021e283-__READ__0000000005
/MyNamespace/kaven/_c_3fa721b5-1015-46e8-8da3-20b333b76ca4-__READ__0000000006
/MyNamespace/kaven/_c_03e0208c-fb25-4229-9534-4dc6c295d6a8-__READ__0000000007
/MyNamespace/kaven/_c_5428e1af-c9fc-4afd-92f7-76fbbb545dcc-__READ__0000000008
/MyNamespace/kaven/_c_395407f3-c402-40b4-abe2-5750f43ddd9d-__WRIT__0000000009
/MyNamespace/kaven/_c_c7a1a90c-8b76-47fc-8c9e-1dc15c43a8bd-__WRIT__0000000010
/MyNamespace/kaven/_c_800861ea-8da2-4ff5-b237-19d8b939393e-__READ__0000000011
/MyNamespace/kaven/_c_3d357ec6-5288-4719-ba3b-7f98eceb32d6-__WRIT__0000000012
/MyNamespace/kaven/_c_e8067da5-bf08-4876-8200-1746dbc2ce77-__WRIT__0000000013
/MyNamespace/kaven/_c_8d4e07a2-08b9-4c1d-91d4-3b0d76844b33-__READ__0000000014
锁降级
public class InterProcessReadWriteLockRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式节点
        Curatorframework curator = CuratorframeworkProperties.getCuratorframework();

        // 分布式锁的路径
        String basePath = "/kaven";

        // 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath,
                "分布式读写锁".getBytes(StandardCharsets.UTF_8));

        // 获取写锁
        readWriteLock.writeLock().acquire();
        System.out.println(Thread.currentThread().getName() + "获取写锁");
        Thread.sleep(2000);
        // 锁降级
        readWriteLock.readLock().acquire();
        System.out.println(Thread.currentThread().getName() + "获取读锁,锁降级成功");
        Thread.sleep(2000);
        // 释放读锁
        System.out.println(Thread.currentThread().getName() + "释放读锁");
        readWriteLock.readLock().release();
        // 释放写锁
        System.out.println(Thread.currentThread().getName() + "释放写锁");
        readWriteLock.writeLock().release();
    }
}
public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
        }
    }
}

输出如下所示:

pool-1-thread-3获取写锁
pool-1-thread-3获取读锁,锁降级成功
pool-1-thread-3释放读锁
pool-1-thread-3释放写锁
pool-1-thread-1获取写锁
pool-1-thread-1获取读锁,锁降级成功
pool-1-thread-1释放读锁
pool-1-thread-1释放写锁
pool-1-thread-4获取写锁
pool-1-thread-4获取读锁,锁降级成功
pool-1-thread-4释放读锁
pool-1-thread-4释放写锁
pool-1-thread-5获取写锁
pool-1-thread-5获取读锁,锁降级成功
pool-1-thread-5释放读锁
pool-1-thread-5释放写锁
pool-1-thread-2获取写锁
pool-1-thread-2获取读锁,锁降级成功
pool-1-thread-2释放读锁
pool-1-thread-2释放写锁

InterProcessReadWriteLock类除了构造方法外,就只有readLock和writeLock这两个方法可以调用,而这两个方法的返回值是InterProcessMutex实例,因此分布式读写锁InterProcessReadWriteLock的实现是基于分布式锁InterProcessMutex。

ZooKeeper : Curator框架之分布式锁InterProcessMutex

Curator框架的分布式锁InterProcessReadWriteLock就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5711206.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存