Java中栅栏CyclicBarrier是一种同步机制,栅栏能够让一组线程到达一个同步点时被阻塞,直到这组线程中的最后一个线程到达同步点,所有被阻塞的线程才会被唤醒继续执行,即目的是只有某一组线程全部执行到同步点时,才继续执行,否则先到达同步点的线程将阻塞。
栅栏CyclicBarrier与闭锁CountDownLatch的功能类似,分析过源码后会简要列举二者区别。
CyclicBarrier类的构造函数会要求使用者为parties变量赋初值,parties变量表示一组线程的总数量,CyclicBarrier内用计数器变量count表示未到达同步点线程的数量。当某一线程到达同步点时,则调用await()方法,此过程中会使count自减,此时如果计数器值非零,说明仍有线程未到达同步点,则当前线程进入阻塞状态,如果计数器值为零,说明全部线程到达了同步点,则唤醒全部线程继续执行。
栅栏源码分析分析核心方法之前,先关注一下CyclicBarrier类中的成员变量以及构造方法,源码如下:
//重入锁主要用于保证线程安全 private final ReentrantLock lock = new ReentrantLock(); //Condition用于阻塞和唤醒线程 private final Condition trip = lock.newCondition(); //表示一组线程的总数量 private final int parties; //本变量表示当所有线程到达同步点且被唤醒前 将要被执行的 *** 作 (可以为null 表示无 *** 作) private final Runnable barrierCommand; private Generation generation = new Generation(); //表示还未执行到同步点的线程数量 当该变量为0时表示所有线程均已执行到同步点 private int count; // 主要进行初始化 *** 作 barrierAction参数说明同变量barrierCommand public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
接下来关注CyclicBarrier类中的静态内部类Generation及成员变量generation,为了讲解Generation方便,我们必须先了解到,与闭锁和信号量不同,CyclicBarrier是可重用的,具体实现会在后文提到,先来说明Generation:
private static class Generation { boolean broken = false; } private Generation generation = new Generation();
下面着重关注CyclicBarrier的核心代码部分,源码如下:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } //CyclicBarrier的核心方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //获取当前代 final Generation g = generation; //如果当前代已被破坏 则抛出异常 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //index为0 表示全部线程执行到同步点 if (index == 0) { //用于判断下列try块中代码(具体是barrierCommand)是否顺利执行 boolean ranAction = false; try { //在唤醒其他线程前 若有指定 则优先执行barrierCommand final Runnable command = barrierCommand; if (command != null) command.run(); //执行成功 ranAction = true; //换代 *** 作 //该方法会唤醒等待队列中的线程 创建一个新的Generation实例 并重置count nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 若能执行到这里 代表仍有线程未到达同步点 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //如果不满足if条件 说明线程不属于当前一代 中断 Thread.currentThread().interrupt(); } } //检测到当前代已被破坏 抛出异常 if (g.broken) throw new BrokenBarrierException(); //dowait执行过程中 或执行了nextGeneration或外界调用reset 已经换代 方法返回 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //无论执行结果如何 释放重入锁 以便同步队列中的其他线程尝试获取锁 lock.unlock(); } }
最后简要分析其余方法,源码如下:
private void nextGeneration() { // 唤醒等待队列中全部线程 trip.signalAll(); // 重置count计数器 count = parties; //创建一个新的Generation实例 表示换代 generation = new Generation(); } //未尝试获取锁与nextGeneration()同理 private void breakBarrier() { //标记当前代已被破坏 generation.broken = true; count = parties; trip.signalAll(); } //获取总线程数 原子 *** 作 不需要获取锁 public int getParties() { return parties; } //独占地判断当前代是否已经被破坏 保证其他线程不会同时对broken执行写 *** 作 线程安全 public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } //独占地重置CyclicBarrier public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } //独占地获取正在等待的线程数量 保证其他线程不会同时对count执行写 *** 作 线程安全 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }栅栏使用的简单实例
假设有如下场景需求:假设有N组数据需要分别计算后,让各组计算结果对某一线程可见,再由该线程经过某些计算后得出最终结果,且该过程需要多次执行。可以考虑使用栅栏解决该问题,示例代码及结果如下
public class CyclicBarrierTest { public static final int N = 3; private static class Compute implements Runnable{ private final CyclicBarrier barrier; private final Integer id; private Compute(CyclicBarrier barrier, Integer id) { this.barrier = barrier; this.id = id; } @Override public void run() { System.out.println("开始执行第" + id + "组数据计算"); try { //某些计算过程 Thread.sleep(500); System.out.println("第" + id + "组数据计算完毕.主线程可见结果"); barrier.await(); System.out.println("全部" + id + "阻塞完毕"); } catch (Exception e) {} } } public static void main(String[] args) throws InterruptedException { CyclicBarrier barrier = new CyclicBarrier(N+1); System.out.println("第一轮次计算"); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Compute(barrier,i)); thread.start(); } try { System.out.println("主线程阻塞等待其他线程计算结果"); barrier.await(); Thread.sleep(50); System.out.println("主线程得到三组计算结果 计算并得到最终结果"); } catch (Exception e) {} barrier.reset(); //仅创建一个线程 证明可重用即可 Thread.sleep(50); System.out.println("第二轮次计算"); Thread thread = new Thread(new Compute(barrier,1)); thread.start(); } }
第一轮次计算 主线程阻塞等待其他线程计算结果 开始执行第1组数据计算 开始执行第0组数据计算 开始执行第2组数据计算 第0组数据计算完毕.主线程可见结果 第1组数据计算完毕.主线程可见结果 第2组数据计算完毕.主线程可见结果 全部2阻塞完毕 全部0阻塞完毕 全部1阻塞完毕 主线程得到三组计算结果 计算并得到最终结果 第二轮次计算 开始执行第1组数据计算 第1组数据计算完毕.主线程可见结果栅栏的部分特点与闭锁CountDownLatch的区别
- 栅栏用于等待一组线程全部执行到同步点,而闭锁用于某个或多个线程等待某外部事件执行完毕。
- 栅栏可重用,闭锁是一次性的。
- 栅栏用到了同步队列(获取释放锁)和等待队列(阻塞和唤醒线程),闭锁仅使用了同步队列。
以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)