Java栅栏CyclicBarrier及源码详解

Java栅栏CyclicBarrier及源码详解,第1张

Java栅栏CyclicBarrier及源码详解 什么是栅栏

Java中栅栏CyclicBarrier是一种同步机制,栅栏能够让一组线程到达一个同步点时被阻塞,直到这组线程中的最后一个线程到达同步点,所有被阻塞的线程才会被唤醒继续执行,即目的是只有某一组线程全部执行到同步点时,才继续执行,否则先到达同步点的线程将阻塞。
栅栏CyclicBarrier与闭锁CountDownLatch的功能类似,分析过源码后会简要列举二者区别。

栅栏实现思路

CyclicBarrier类的构造函数会要求使用者为parties变量赋初值,parties变量表示一组线程的总数量,CyclicBarrier内用计数器变量count表示未到达同步点线程的数量。当某一线程到达同步点时,则调用await()方法,此过程中会使count自减,此时如果计数器值非零,说明仍有线程未到达同步点,则当前线程进入阻塞状态,如果计数器值为零,说明全部线程到达了同步点,则唤醒全部线程继续执行。

栅栏源码分析

分析核心方法之前,先关注一下CyclicBarrier类中的成员变量以及构造方法,源码如下:

	
	//重入锁主要用于保证线程安全
    private final ReentrantLock lock = new ReentrantLock();
    //Condition用于阻塞和唤醒线程
    private final Condition trip = lock.newCondition();
	//表示一组线程的总数量
    private final int parties;
	//本变量表示当所有线程到达同步点且被唤醒前 将要被执行的 *** 作 (可以为null 表示无 *** 作)
    private final Runnable barrierCommand;
	
    private Generation generation = new Generation();
	//表示还未执行到同步点的线程数量 当该变量为0时表示所有线程均已执行到同步点
	private int count;
	
	// 主要进行初始化 *** 作 barrierAction参数说明同变量barrierCommand
	public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

接下来关注CyclicBarrier类中的静态内部类Generation及成员变量generation,为了讲解Generation方便,我们必须先了解到,与闭锁和信号量不同,CyclicBarrier是可重用的,具体实现会在后文提到,先来说明Generation:

	
	private static class Generation {
        boolean broken = false;
    }

	private Generation generation = new Generation();

下面着重关注CyclicBarrier的核心代码部分,源码如下:

	
	public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
	//CyclicBarrier的核心方法
	private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	//获取当前代
            final Generation g = generation;
			//如果当前代已被破坏 则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
			
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			
            int index = --count;
            //index为0 表示全部线程执行到同步点
            if (index == 0) {
            	//用于判断下列try块中代码(具体是barrierCommand)是否顺利执行
                boolean ranAction = false;
                try {
                	//在唤醒其他线程前 若有指定 则优先执行barrierCommand
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    //执行成功
                    ranAction = true;
                    //换代 *** 作 
                    //该方法会唤醒等待队列中的线程 创建一个新的Generation实例 并重置count
                    nextGeneration();
                    return 0;
                } finally {
                	
                	
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 若能执行到这里 代表仍有线程未到达同步点
            for (;;) {
                try {
                	
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                	
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                    	//如果不满足if条件 说明线程不属于当前一代 中断                    	
                        Thread.currentThread().interrupt();
                    }
                }
				//检测到当前代已被破坏 抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
				//dowait执行过程中 或执行了nextGeneration或外界调用reset 已经换代 方法返回
                if (g != generation)
                    return index;
				
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
        	//无论执行结果如何 释放重入锁 以便同步队列中的其他线程尝试获取锁
            lock.unlock();
        }
    }
	

最后简要分析其余方法,源码如下:

	
	private void nextGeneration() {
        // 唤醒等待队列中全部线程
        trip.signalAll();
        // 重置count计数器
        count = parties;
        //创建一个新的Generation实例 表示换代
        generation = new Generation();
    }
	//未尝试获取锁与nextGeneration()同理
	private void breakBarrier() {
		//标记当前代已被破坏
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
	//获取总线程数 原子 *** 作 不需要获取锁
	public int getParties() {
        return parties;
    }
	//独占地判断当前代是否已经被破坏 保证其他线程不会同时对broken执行写 *** 作 线程安全
	public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    //独占地重置CyclicBarrier
	public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
	
	//独占地获取正在等待的线程数量 保证其他线程不会同时对count执行写 *** 作 线程安全
	 public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
栅栏使用的简单实例

假设有如下场景需求:假设有N组数据需要分别计算后,让各组计算结果对某一线程可见,再由该线程经过某些计算后得出最终结果,且该过程需要多次执行。可以考虑使用栅栏解决该问题,示例代码及结果如下

	public class CyclicBarrierTest {

    public static final int N = 3;

    private static class Compute implements Runnable{

        private final CyclicBarrier barrier;
        private final Integer id;

        private Compute(CyclicBarrier barrier, Integer id) {
            this.barrier = barrier;
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println("开始执行第" + id + "组数据计算");
            try {
                //某些计算过程
                Thread.sleep(500);
                System.out.println("第" + id + "组数据计算完毕.主线程可见结果");
                barrier.await();
                System.out.println("全部" + id + "阻塞完毕");
            } catch (Exception e) {}
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(N+1);
        System.out.println("第一轮次计算");
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Compute(barrier,i));
            thread.start();
        }
        try {
            System.out.println("主线程阻塞等待其他线程计算结果");
            barrier.await();
            Thread.sleep(50);
            System.out.println("主线程得到三组计算结果 计算并得到最终结果");
        } catch (Exception e) {}
        barrier.reset();
        //仅创建一个线程 证明可重用即可
        Thread.sleep(50);
        System.out.println("第二轮次计算");
        Thread thread = new Thread(new Compute(barrier,1));
        thread.start();

    }

}
第一轮次计算
主线程阻塞等待其他线程计算结果
开始执行第1组数据计算
开始执行第0组数据计算
开始执行第2组数据计算
第0组数据计算完毕.主线程可见结果
第1组数据计算完毕.主线程可见结果
第2组数据计算完毕.主线程可见结果
全部2阻塞完毕
全部0阻塞完毕
全部1阻塞完毕
主线程得到三组计算结果 计算并得到最终结果
第二轮次计算
开始执行第1组数据计算
第1组数据计算完毕.主线程可见结果
栅栏的部分特点与闭锁CountDownLatch的区别
  • 栅栏用于等待一组线程全部执行到同步点,而闭锁用于某个或多个线程等待某外部事件执行完毕。
  • 栅栏可重用,闭锁是一次性的。
  • 栅栏用到了同步队列(获取释放锁)和等待队列(阻塞和唤醒线程),闭锁仅使用了同步队列。

以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5120712.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存