【Java进阶营】Java源码分析-带你认识什么是AQS(中)

【Java进阶营】Java源码分析-带你认识什么是AQS(中),第1张

前提回顾

AQS三部曲之中篇,本篇文章主要面向与对于AQS各个组件的实现的方式和原理。

AQS在各同步器内的Sync与State实现
什么是state机制

提供volatile变量state,用于同步线程之间的共享状态。

通过CAS和volatile保证其原子性和可见性,对应源码里的定义:

/**

  • 同步状态
    */
    private volatile int state;

/**
*cas
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

不同实现类的Sync与State:

基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。

ReentrantLock

需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。在此我向大家推荐一个架构学习交流圈。交流学习指导伪鑫:1253431195(里面有大量的面试题及答案)里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

以下为ReetrantLock的FairSync的tryAcquire实现代码解析。

//公平获取锁

protected final boolean tryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

//如果当前重进入数为0,说明有机会取得锁

if (c == 0) {

    //如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁

    if (isFirst(current) &&

        compareAndSetState(0, acquires)) {

        setExclusiveOwnerThread(current);

        return true;

  }

}

//如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁

else if (current == getExclusiveOwnerThread()) {

    int nextc = c + acquires;

    if (nextc < 0)

        throw new Error("Maximum lock count exceeded");

    setState(nextc);

    return true;

}

//以上条件都不满足,那么线程进入等待队列。

return false;

}

Semaphore

则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为Semaphore的FairSync实现

protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
for (;😉 {
Thread first = getFirstQueuedThread();
//如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待
if (first != null && first != current)
return -1;
//如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值
int available = getState();
int remaining = available - acquires;
//如果remaining < 0,那么反馈给AQS当前线程需要等待,
//如果remaining > 0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),
//也就告知AQS当前线程拿到许可,可以继续执行。
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}

CountDownLatch

闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countDown()几次,因为每次CountDown是sync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁需要被countDown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。

await时候执行,只查看当前需要countDown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countDown次数足够,并且unpark所有等待线程。

public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}

countDown 时候执行,如果当前countDown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要 countDown的数量并且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS判定是否释放所有await线程。

public boolean tryReleaseShared(int releases) {
for (;😉 {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

FutureTask

需要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续还是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是检查状态,如果是RUNNING状态那么让当前线程park。

而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。

get时待用,只检查当前任务是否完成或者被Cancel,如果未完成并且没有被cancel,那么告诉AQS当前线程需要进入等待队列并且park住

protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

//判定任务是否完成或者被Cancel

boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}

get时调用,对于CANCEL与其他异常进行抛错

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {

if (!tryAcquireSharedNanos(0,nanosTimeout))

    throw new TimeoutException();

if (getState() == CANCELLED)

    throw new CancellationException();

if (exception != null)

    throw new ExecutionException(exception);

return result;

}

//任务的执行线程执行完毕调用(set(V v))

void innerSet(V v) {

for (;;) {

    int s = getState();

    //如果线程任务已经执行完毕,那么直接返回(多线程执行任务?)

    if (s == RAN)

        return;

    //如果被CANCEL了,那么释放等待线程,并且会抛错

    if (s == CANCELLED) {

        releaseShared(0);

        return;

}

如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由FutrueTask的子类实现)

    if (compareAndSetState(s, RAN)) {

        result = v;

        releaseShared(0);

        done();

        return;

}

}

}

解析原理

以上4个AQS的使用是比较典型,然而有个问题就是这些状态存在哪里呢?并且是可以计数的。从以上4个example,我们可以很快得到答案,AQS提供给了子类一个int state属性。并且暴露给子类getState()和setState()两个方法(protected)。

这样就为上述状态解决了存储问题,ReentrantLock可以将这个state用于- 存储当前线程的重进入次数,Semaphore可以用这个state存储许可数,CountDownLatch则可以存储需要被countDown的次数,而Future则可以存储当前任务的执行状态(RUNING,RAN,CANCELL)。其他的Synchronizer存储他们的一些状态。

AQS留给实现者的方法主要有5个方法

其中tryAcquire,tryRelease和isHeldExclusively三个方法为需要独占形式获取的synchronizer实现的,比如线程独占ReetranLock的Sync,
    ReentrantLock内部Sync类实现的是tryAcquire,tryRelease, isHeldExclusively三个方法(因为获取锁的公平性问题,tryAcquire由继承该Sync类的内部类FairSync和NonfairSync实现)

而tryAcquireShared和tryReleasedShared为需要共享形式获取的synchronizer实现。

Semaphore内部类Sync则实现了tryAcquireShared和tryReleasedShared(与CountDownLatch相似,因为公平性问题,tryAcquireShared由其内部类FairSync和NonfairSync实现)。

CountDownLatch内部类Sync实现了tryAcquireShared和tryReleasedShared。

FutureTask内部类Sync也实现了tryAcquireShared和tryReleasedShared。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/729366.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存