ZooKeeper : Curator框架之分布式屏障DistributedBarrier

ZooKeeper : Curator框架之分布式屏障DistributedBarrier,第1张

ZooKeeper : Curator框架之分布式屏障DistributedBarrier DistributedBarrier

DistributedBarrier类的源码注释:

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.

分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。

类比单体应用的屏障CyclicBarrier:

Java并发编程:CountDownLatch、CyclicBarrier、Semaphore初使用

DistributedBarrier源码:

public class DistributedBarrier
{
    // Curatorframework实例,用于与Zookeeper进行交互
    private final Curatorframework client;
    // 分布式屏障的路径
    private final String barrierPath;
    // 监听器
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            client.postSafeNotify(DistributedBarrier.this);
        }
    };

    
    public DistributedBarrier(Curatorframework client, String barrierPath)
    {
        this.client = client;
        this.barrierPath = PathUtils.validatePath(barrierPath);
    }

    
    public synchronized void         setBarrier() throws Exception
    {
        try
        {
            client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
        }
        catch ( KeeperException.NodeExistsException ignore )
        {
            // ignore
        }
    }

    
    public synchronized void      removeBarrier() throws Exception
    {
        try
        {
            client.delete().forPath(barrierPath);
        }
        catch ( KeeperException.NoNodeException ignore )
        {
            // ignore
        }
    }

    
    public synchronized void      waitOnBarrier() throws Exception
    {
        waitOnBarrier(-1, null);
    }

    
    public synchronized boolean      waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
    {
        long            startMs = System.currentTimeMillis();
        boolean         hasMaxWait = (unit != null);
        long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

        boolean         result;
        for(;;)
        {
            // 屏障节点是否不存在,true为不存在,false为存在
            result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
            // 屏障节点不存在,直接退出
            if ( result )
            {
                break;
            }
            // 屏障节点存在,进行等待
            if ( hasMaxWait )
            {
                long        elapsed = System.currentTimeMillis() - startMs;
                long        thisWaitMs = maxWaitMs - elapsed;
                // 等待超时,直接退出
                if ( thisWaitMs <= 0 )
                {
                    break;
                }
                // 继续等待
                wait(thisWaitMs);
            }
            else
            {
                wait();
            }
        }
        return result;
    }
}

DistributedBarrier的源码还是比较简单的,就是通过一个Zookeeper节点的创建与删除来实现分布式屏障。

测试

pom.xml:



    4.0.0

    com.kaven
    zookeeper
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.curator
            curator-recipes
            5.2.0
        
        
            org.projectlombok
            lombok
            1.18.22
        
    

CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息,以及创建Curatorframework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorframeworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static Curatorframework getCuratorframework() {
        // 创建Curatorframework实例
        Curatorframework curator = CuratorframeworkFactory.builder()
                .connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorframeworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);
        return curator;
    }
}

DistributedBarrierRunnable类(实现了Runnable接口,模拟分布式节点等待分布式屏障):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.Random;

public class DistributedBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式节点
        Curatorframework curator = CuratorframeworkProperties.getCuratorframework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);
        
        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于提供分布式屏障功能
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
        long start = System.currentTimeMillis();
        // 等待屏障被移除
        barrier.waitOnBarrier();
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        System.out.println(Thread.currentThread().getName() + " 继续执行");
    }
}

启动类:

package com.kaven.zookeeper;

import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 创建Curatorframework实例
        Curatorframework curator = CuratorframeworkProperties.getCuratorframework();

        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于设置和在适当时机删除屏障
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        // 设置屏障
        barrier.setBarrier();

        // 分布式节点处理业务
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new DistributedBarrierRunnable());
        }

        // 模拟移除屏障需要处理的业务
        Thread.sleep(20000);

        // 移除屏障
        barrier.removeBarrier();
    }
}

模拟5个分布式节点等待分布式屏障,输出如下所示:

pool-1-thread-5 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-1 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-2 等待了 19 s
pool-1-thread-2 继续执行
pool-1-thread-5 等待了 19 s
pool-1-thread-5 继续执行
pool-1-thread-3 等待了 19 s
pool-1-thread-3 继续执行
pool-1-thread-4 等待了 19 s
pool-1-thread-4 继续执行
pool-1-thread-1 等待了 19 s
pool-1-thread-1 继续执行

使用提供超时机制的等待屏障方法:

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DistributedBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式节点
        Curatorframework curator = CuratorframeworkProperties.getCuratorframework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于提供分布式屏障功能
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
        long start = System.currentTimeMillis();
        
        // 等待屏障被移除
        boolean result = barrier.waitOnBarrier(10, TimeUnit.SECONDS);
        
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        if(result) {
            System.out.println(Thread.currentThread().getName() + " 继续执行");
        }
        else {
            // 等待屏障超时
            System.out.println(Thread.currentThread().getName() + " 等待屏障超时");
        }
    }
}

输出如下所示:

pool-1-thread-1 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-5 等待屏障被移除
pool-1-thread-1 等待了 10 s
pool-1-thread-1 等待屏障超时
pool-1-thread-2 等待了 10 s
pool-1-thread-2 等待屏障超时
pool-1-thread-3 等待了 10 s
pool-1-thread-3 等待屏障超时
pool-1-thread-4 等待了 10 s
pool-1-thread-4 等待屏障超时
pool-1-thread-5 等待了 10 s
pool-1-thread-5 等待屏障超时

符合预期。

Curator框架的分布式屏障DistributedBarrier就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706803.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存