DistributedBarrier类的源码注释:
Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.
分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。
类比单体应用的屏障CyclicBarrier:
Java并发编程:CountDownLatch、CyclicBarrier、Semaphore初使用
DistributedBarrier源码:
public class DistributedBarrier { // Curatorframework实例,用于与Zookeeper进行交互 private final Curatorframework client; // 分布式屏障的路径 private final String barrierPath; // 监听器 private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { client.postSafeNotify(DistributedBarrier.this); } }; public DistributedBarrier(Curatorframework client, String barrierPath) { this.client = client; this.barrierPath = PathUtils.validatePath(barrierPath); } public synchronized void setBarrier() throws Exception { try { client.create().creatingParentContainersIfNeeded().forPath(barrierPath); } catch ( KeeperException.NodeExistsException ignore ) { // ignore } } public synchronized void removeBarrier() throws Exception { try { client.delete().forPath(barrierPath); } catch ( KeeperException.NoNodeException ignore ) { // ignore } } public synchronized void waitOnBarrier() throws Exception { waitOnBarrier(-1, null); } public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE; boolean result; for(;;) { // 屏障节点是否不存在,true为不存在,false为存在 result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null); // 屏障节点不存在,直接退出 if ( result ) { break; } // 屏障节点存在,进行等待 if ( hasMaxWait ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; // 等待超时,直接退出 if ( thisWaitMs <= 0 ) { break; } // 继续等待 wait(thisWaitMs); } else { wait(); } } return result; } }
DistributedBarrier的源码还是比较简单的,就是通过一个Zookeeper节点的创建与删除来实现分布式屏障。
测试pom.xml:
4.0.0 com.kaven zookeeper1.0-SNAPSHOT 8 8 org.apache.curator curator-recipes5.2.0 org.projectlombok lombok1.18.22
CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息,以及创建Curatorframework实例的方法):
package com.kaven.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.imps.CuratorframeworkState; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorframeworkProperties { // 连接地址 public static final String CONNECT_ADDRESS = "192.168.1.3:9000"; // 连接超时时间 public static final int CONNECTION_TIMEOUT_MS = 40000; // Session超时时间 public static final int SESSION_TIMEOUT_MS = 10000; // 命名空间 public static final String NAMESPACE = "MyNamespace"; // 重试策略 public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3); public static Curatorframework getCuratorframework() { // 创建Curatorframework实例 Curatorframework curator = CuratorframeworkFactory.builder() .connectString(CuratorframeworkProperties.CONNECT_ADDRESS) .retryPolicy(CuratorframeworkProperties.RETRY_POLICY) .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS) .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS) .namespace(CuratorframeworkProperties.NAMESPACE) .build(); curator.start(); assert curator.getState().equals(CuratorframeworkState.STARTED); return curator; } }
DistributedBarrierRunnable类(实现了Runnable接口,模拟分布式节点等待分布式屏障):
package com.kaven.zookeeper; import lombok.SneakyThrows; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import java.util.Random; public class DistributedBarrierRunnable implements Runnable{ @SneakyThrows @Override public void run() { // 使用不同的Curatorframework实例,表示不同的分布式节点 Curatorframework curator = CuratorframeworkProperties.getCuratorframework(); // 模拟随机加入的分布式节点 int randomSleep = new Random().nextInt(1000); Thread.sleep(randomSleep); // 分布式屏障的路径 String barrierPath = "/kaven"; // 创建DistributedBarrier实例,用于提供分布式屏障功能 DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath); System.out.println(Thread.currentThread().getName() + " 等待屏障被移除"); long start = System.currentTimeMillis(); // 等待屏障被移除 barrier.waitOnBarrier(); System.out.println(Thread.currentThread().getName() + " 等待了 " + (System.currentTimeMillis() - start) / 1000 + " s"); System.out.println(Thread.currentThread().getName() + " 继续执行"); } }
启动类:
package com.kaven.zookeeper; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Application { private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); public static void main(String[] args) throws Exception { // 创建Curatorframework实例 Curatorframework curator = CuratorframeworkProperties.getCuratorframework(); // 分布式屏障的路径 String barrierPath = "/kaven"; // 创建DistributedBarrier实例,用于设置和在适当时机删除屏障 DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath); // 设置屏障 barrier.setBarrier(); // 分布式节点处理业务 for (int i = 0; i < 5; i++) { EXECUTOR_SERVICE.execute(new DistributedBarrierRunnable()); } // 模拟移除屏障需要处理的业务 Thread.sleep(20000); // 移除屏障 barrier.removeBarrier(); } }
模拟5个分布式节点等待分布式屏障,输出如下所示:
pool-1-thread-5 等待屏障被移除 pool-1-thread-3 等待屏障被移除 pool-1-thread-1 等待屏障被移除 pool-1-thread-4 等待屏障被移除 pool-1-thread-2 等待屏障被移除 pool-1-thread-2 等待了 19 s pool-1-thread-2 继续执行 pool-1-thread-5 等待了 19 s pool-1-thread-5 继续执行 pool-1-thread-3 等待了 19 s pool-1-thread-3 继续执行 pool-1-thread-4 等待了 19 s pool-1-thread-4 继续执行 pool-1-thread-1 等待了 19 s pool-1-thread-1 继续执行
使用提供超时机制的等待屏障方法:
package com.kaven.zookeeper; import lombok.SneakyThrows; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import java.util.Random; import java.util.concurrent.TimeUnit; public class DistributedBarrierRunnable implements Runnable{ @SneakyThrows @Override public void run() { // 使用不同的Curatorframework实例,表示不同的分布式节点 Curatorframework curator = CuratorframeworkProperties.getCuratorframework(); // 模拟随机加入的分布式节点 int randomSleep = new Random().nextInt(1000); Thread.sleep(randomSleep); // 分布式屏障的路径 String barrierPath = "/kaven"; // 创建DistributedBarrier实例,用于提供分布式屏障功能 DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath); System.out.println(Thread.currentThread().getName() + " 等待屏障被移除"); long start = System.currentTimeMillis(); // 等待屏障被移除 boolean result = barrier.waitOnBarrier(10, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + " 等待了 " + (System.currentTimeMillis() - start) / 1000 + " s"); if(result) { System.out.println(Thread.currentThread().getName() + " 继续执行"); } else { // 等待屏障超时 System.out.println(Thread.currentThread().getName() + " 等待屏障超时"); } } }
输出如下所示:
pool-1-thread-1 等待屏障被移除 pool-1-thread-2 等待屏障被移除 pool-1-thread-3 等待屏障被移除 pool-1-thread-4 等待屏障被移除 pool-1-thread-5 等待屏障被移除 pool-1-thread-1 等待了 10 s pool-1-thread-1 等待屏障超时 pool-1-thread-2 等待了 10 s pool-1-thread-2 等待屏障超时 pool-1-thread-3 等待了 10 s pool-1-thread-3 等待屏障超时 pool-1-thread-4 等待了 10 s pool-1-thread-4 等待屏障超时 pool-1-thread-5 等待了 10 s pool-1-thread-5 等待屏障超时
符合预期。
Curator框架的分布式屏障DistributedBarrier就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)