AQS的同步组件应用:CountDownLatch、Cyclicbarrier、Semaphore

AQS的同步组件应用:CountDownLatch、Cyclicbarrier、Semaphore,第1张

AQS的同步组件应用:CountDownLatch、Cyclicbarrier、Semaphore

同步:并发的线程在某一关键点上可能需要相互等待通知或互通消息,这种机制叫线程同步。

他们几个底层实现都是AQS,是jdk并发包里提供的并发工具类,注意,他们是工具类,跟我们平时自己写的或使用的工具类一样,目的是帮助我们处理并发的流程控制或解决我们多线程时遇到的一些问题(如:多线程互相等待通知、模拟并发等)

CountDownLatch

可以让线程等待其他线程完成 *** 作后自己再干。

它跟join类似,可以让主线程暂停子线程先去执行;它又比join灵活,join只能是子线程完全执行完才会通知主线程,countDownLatch可以让子线程在任意时刻显示通知主线程,主线程接到通知后可以跟子线程一块执行;实现功能又可以跟FutureTask差不多,FutureTask可以返回子线程处理任务的状态,主线程里不断的while循环判断判断子线程状态是否是done,这种方式占用的是cpu资源,因为他要忙循环不断检查所有子线程都处理完,countDownLatch的实现机制是主线程暂停后就“挂起”了,释放了cpu,等所有子线程完成后再通知主线程,这种方式引起线程间上下文切换,耗性能。

用法:

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch taskLock = new CountDownLatch(3);
       Thread thread1 = new Thread(()->{
               String name = Thread.currentThread().getName();
               System.out.println(name+":处理业务");
               taskLock.countDown();
               System.out.println(name+":调用完毕释放资源,同步状态剩余:"+taskLock.getCount());
       },"Thread-1");
        Thread thread2 = new Thread(()->{
                String name = Thread.currentThread().getName();
                System.out.println(name+":处理业务");
                taskLock.countDown();
                System.out.println(name+":调用完毕释放资源,同步状态剩余:"+taskLock.getCount());
        },"Thread-2");
        Thread thread3 = new Thread(()->{
                String name = Thread.currentThread().getName();
                System.out.println(name+":处理业务");
                taskLock.countDown();
                System.out.println(name+":调用完毕释放资源,同步状态剩余:"+taskLock.getCount());
        },"Thread-3");

        System.out.println("----------主线程 开始了,并开启多个线程处理业务------------");
        thread1.start();
        thread2.start();
        thread3.start();
        System.out.println("主线程遇到同步状态,需要做同步,进入等待状态,暂停了——————");
//主线程调用await方法后就被阻塞了,当子线程countdown后给同步状态减1,直到减到0,主线程继续执行
//      taskLock.await(2000, TimeUnit.MILLISECONDS);
        taskLock.await();
        System.out.println("主线程被通知到又可以继续走了"+taskLock.getCount());

    }
}
-----------------主线程 开始了,并开启多个线程处理业务------------
主线程遇到同步状态,需要做同步,进入等待状态,暂停了——————
Thread-3:处理业务
Thread-3:调用完毕释放资源,同步状态剩余:2
Thread-2:处理业务
Thread-2:调用完毕释放资源,同步状态剩余:1
Thread-1:处理业务
主线程被通知到又可以继续走了0
Thread-1:调用完毕释放资源,同步状态剩余:0

1、countDownLatch创建时传入同步状态数(给state赋值),几个子线程就设置几;

2、主线程调用await后被阻塞,直到state值为0被唤醒;

3、子线程调一次countDown就给state减1,直到减到0后唤醒主线程。

CountDown Latch底层实现利用的AQS,对同步状态变量做减法,主要的两个方法await和countDown,类似于监视器的await和notify,底部对线程的阻塞利用的LockSupport的park和unpark,通过countDown Latch进行多线程之间的调度,实现多线程的同步。

CyclicBarrier

同步屏障:让一组线程到达某个点后一块执行。

类似于一个屏障,拦住线程,等都到达后再同时放行,并且它的同步状态还可以再循环利用。它的应用可以模拟并发,也可以让一个线程等待多个线程都执行完后再综合处理各个线程的结果。

用法:

public class CyclicBarrierTest {

    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Thread(()->{
            System.out.println("初始化了---3个线程---大家一块执行");
    }));

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,3,0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3));
            threadPoolExecutor.submit(()->{
                try {
                    System.out.println("我是1=="+System.currentTimeMillis());
                    cyclicBarrier.await();
                    System.out.println("我是1处理逻辑=="+System.currentTimeMillis());
                } catch (Exception e) {}
            });
            threadPoolExecutor.submit(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("我是2=="+System.currentTimeMillis());
                    cyclicBarrier.await();
                    System.out.println("我是2处理逻辑=="+System.currentTimeMillis());
                } catch (Exception e) {}
            });
            threadPoolExecutor.submit(()->{
                try {
                    Thread.sleep(2000);
                    System.out.println("我是3=="+System.currentTimeMillis());
                    cyclicBarrier.await();
                    System.out.println("我是3处理逻辑=="+System.currentTimeMillis());
                } catch (Exception e) {}
            });
            threadPoolExecutor.shutdown();
    }
}
我是1==1640917282738
我是2==1640917283741
我是3==1640917284740
初始化了---3个线程---大家一块执行
我是3处理逻辑==1640917284740
我是1处理逻辑==1640917284740
我是2处理逻辑==1640917284740

1、CyclicBarrier创建时传入同步状态数(给state赋值),几个子线程就设置几;

2、子线程调用await后被阻塞;

3、子线程调一次await就给state减1,直到state值为0时全部线程才被唤醒

CyclicBarrier底层实现利用的AQS,对初始的同步状态变量做减法,利用可重入锁ReentrantLock和等待condition条件实现,加锁保证多线程时只有一个修改同步状态,然后判断同步修改后的同步状态是否等于0,等于0就唤醒等待在condition上的所有线程,不为0,当前线程就在condition条件上挂起。

Semaphore

信号量:它实现的也是线程之间的调度

它采用的同步状态递增的形式实现调度。初始化设置同步状态是0,一组线程让他增,一组线程让他减,始终围绕0做判断就行。信号量有公平和非公平两种策略。

用法:

public class SemaphoreTest3 {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(0);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,3,0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3));
        threadPoolExecutor.submit(()->{
            try {
                System.out.println("我是1=我给一个信号量="+System.currentTimeMillis());
                semaphore.release(1);
            } catch (Exception e) {}
        });
        threadPoolExecutor.submit(()->{
            try {
                Thread.sleep(1000);
                System.out.println("我是2==我给两个信号量="+System.currentTimeMillis());
                semaphore.release(2);
            } catch (Exception e) {}
        });
        threadPoolExecutor.submit(()->{
            try {
                Thread.sleep(2000);
                System.out.println("我是3=我给一个信号量="+System.currentTimeMillis());
                semaphore.release(1);
            } catch (Exception e) {}
        });
        threadPoolExecutor.shutdown();
        try {
            System.out.println("主线程走着走着就被暂停了,我需要4个信号量");
            semaphore.acquire(4);//同步状态是-4
            System.out.println("主线程被唤醒了,又可以继续了");
        } catch (InterruptedException e) {}
    }
}
主线程走着走着就被暂停了,我需要4个信号量
我是1=我给一个信号量=1640921214160
我是2==我给两个信号量=1640921215162
我是3=我给一个信号量=1640921216161
主线程被唤醒了,又可以继续了

调用acquire就是阻塞,唤醒的条件就是需要传入参数的数量,调用release就是唤醒线程,只不过条件是要到达acquire的值。

1、信号量初始时同步状态设置为0,

2、调用acquire传入值,阻塞线程,同步状态值改为0减去新传入的值(负数)

3、调用release传入值,同步状态加上该值,当同步状态再次为0的时候唤醒线程。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5691082.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存