继续写博客吧,不然时间都被荒废了,这次一定要拿下 AQS
博客内容来源于 周冠亚老师的 Java 面试一战到底,主要是根据书中的内容对JUC 源码梳理
梳理的时候是将源码复制,翻译英文注释,然后对照书上纠正翻译问题和理解问题,基本上是一篇草稿blog
主要内容为:共享模式/独占模式 获取/释放 同步资源
穿插一些自问自答,以及待解答的小问题,推荐阅读作者原作
AbstractQueuedSynchronized 1 AbstractQueuedSynchronizer独占式获取锁,
aos 是 aqs 的父类
关键词:blocking locks, semaphores, events
该类中的方法:
// 独占模式下,同步器的当前所有者
private transient Thread exclusiveOwnerThread;
// 设置独占模式同步器的当前所有者
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 获取独占模式同步器的当前所有者
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
2 AQS 内部类
AQS 中有两个内部类:Node,ConditionObject
Node :静态内部类,用于表示队列的结点信息
ConditionObject :非静态内部类,用于实现条件阻塞
2.1 NodeNode 属性:waiterstatus, prev, next, thread, nextWaiter
通过Node 可以实现两种队列:
- 通过 prev 和 next 属性实现的 CLH 队列【同步队列,双向队列】
- nextWaiter 属性实现的在 Condition 条件上等待线程队列【条件队列,单向队列】
问题:为什么 同步队列是双向的,而 等待队列是单向的?
同步队列是由前驱结点解除BLOCKING 的,而等待队列是由其它线程 signal/signalAll() 解除等待
问题:两者之间的关联?
等待队列中的结点一定获得过同步资源,当被signal/signalAll() 后加入到同步队列
【上述待整理 条件模式 后验证是否正确,还有是否与公平模式和非公平模式有关】
static final class Node {
// SHARED 标识结点当前在共享模式下
static final Node SHARED = new Node();
// EXCLUSIVE 标识结点当前在独占模式下
static final Node EXCLUSIVE = null;
// CANCELLED 线程取消获取同步资源
static final int CANCELLED = 1;
// SIGNAL 当前线程的后继结点对应的线程需要被唤醒
static final int SIGNAL = -1;
// CONDITION 结点当前处于条件队列中
static final int CONDITION = -2;
// PROPAGATE 共享同步状态可以进行传播
static final int PROPAGATE = -3;
// waitStatus {CANCELLED, SIGNAL, CONDITION, PROPAGATE}
volatile int waitStatus;
// prev 当前结点的前驱结点,用于检查 waitStatus
volatile Node prev;
// 指向当前结点在释放时唤醒的后继结点
volatile Node next;
// 进入队列时的线程对象
volatile Thread thread;
// 存储条件队列的后继结点
Node nextWaiter;
// 当条件队列中的结点在共享模式下等待时返回 true
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回当前结点的前驱结点
final Node predecessor() throws NullPointerException {
// 1. 得到前驱结点
Node p = prev;
// 2. 若 prev 为空则抛出空指针异常
if (p == null)
throw new NullPointerException();
// 3. prev 不为空则返回
else
return p;
}
// 无参构造用于初始化 head,或设置 SHARED 标识
Node() { // Used to establish initial head or SHARED marker
}
// thread, mode 用于 addWaiter()
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// thread, waitStatus 用于 Condition 条件队列
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2.2 ConditionObject
ConditionObject 实现了 Condition 接口
条件模式留到下一张博客
3 AQS 属性head,tail, state
// head 队列头结点,dummy node 不代表任何线程,setHead() 修改
private transient volatile Node head;
// 队列尾结点,enq() 修改
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 返回 state 状态
protected final int getState() {
return state;
}
// 设置 state 状态
protected final void setState(int newState) {
state = newState;
}
4 AQS 独占模式
同步资源的状态:独占,共享
独占模式的功能:获取同步资源,释放同步资源
4.1 acquire() 独占式获取同步资源public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
实现流程:
- 调用 tryAcquire() 尝试获取同步资源,一般在AQS子类实现
- 调用 addWaiter() 将当前调用 acquire() 方法的线程入队
- 调用 acquireQueued() 在等待队列中获取同步资源
问题1:验证上面 3. 等待队列中获取同步资源是否准确?
问题2:线程中断是如何处理的?注释部分 – ignoring interrupts
tryAcquire() 具体逻辑需要 AQS 子类实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
4.1.2 addWaiter( )
addWaiter() 将未获取到同步资源使用权的线程加入同步队列中
// mode Node.EXCLUSIVE 独占, Node.SHARED 共享
private Node addWaiter(Node mode) {
// 1. 用当前线程创建一个Node 结点
Node node = new Node(Thread.currentThread(), mode);
// 2. 获取队列的尾节点
Node pred = tail;
// 3.1 若当前队列非空
if (pred != null) {
// 3.1.1 node 结点的 prev 引用指向 tail
node.prev = pred;
// 3.1.2 CAS *** 作设置node 为新的 tail,若失败则执行 enq()
if (compareAndSetTail(pred, node)) {
//3.1.3 CAS 成功,将 prev [old tail] 的 next 引用指向 node
pred.next = node;
// 3.1.4 返回 new tail
return node;
}
}
// 3.2 当前队列为空,或队列非空时通过CAS入队尾失败[多线程竞争入队]
// 通过 enq() 方法自旋
enq(node);
// 4. 返回入尾成功的 new tail
return node;
}
注意:mode {Node.EXCLUSIVE, Node.SHARED} 即独占模式和共享模式都是通过 addWaiter() 添加结点到同步队列的
addWaiter(Node mode) 实现流程:
- 将当前线程封装到 Node 中,通过 Thread.currentThread() 获取当前线程
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
问题:为什么将Node.nextWaiter 设置为 mode,即 Node.EXCLUSIVE
共享模式:作为判断后继结点是否是传播模式
if (s == null || s.isShared())
doReleaseShared();
final boolean isShared() {
return nextWaiter == SHARED;
}
- 队列非空使用CAS将Node 添加到队尾
- 队尾为空使用 enq() 添加Node【多了初始化队列的处理】
// cas (MemoryValue, OldValue, ModifyValue)
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
CAS 添加结点入尾流程:
- 通过 this, tailOffset 获取 MemoryValue
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
- 机制 MemoryValue == OldValue ? OldValue = ModifyValue : cas
① 若队列为空,CAS 设置头结点【会执行 enq() 说明队列为空,会执行 if 分支】
② CAS 设置尾结点
private Node enq(final Node node) {
//1. 若if(cas)失败则继续执行for(;;)
for (;;) {
// 2. 获取尾结点
Node t = tail;
// 3.1 队列为空
if (t == null) { // Must initialize
// 3.1.1 CAS 创建头结点
if (compareAndSetHead(new Node()))
// 3.1.2 CAS 成功,队列中仅有一个结点,头结点 = 尾结点
tail = head;
} else {
// 3.2 队列非空
node.prev = t;
// 3.2.1 CAS 设置 node = new tail
if (compareAndSetTail(t, node)) {
// 3.2.2 old tail 的 next 引用指向 node
t.next = node;
// 自旋结束,返回原尾结点
return t;
}
}
}
}
4.1.3 acquireQueued( )
线程成功进入同步队列后,通过 acquireQueued() 方法在同步队列中等待同步资源释放
// 结点加入队列后,尝试在同步队列中自旋获得资源
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功获取到资源, 置为 false 标识获取资源成功,finally 中 true 取消获取
boolean failed = true;
try {
// 标记线程是否被中断
boolean interrupted = false;
for (;;) {
// 1. 获取 node 的前驱结点
final Node p = node.predecessor();
// 2. node 的前驱是 head, 则head释放资源后,node尝试获取资源
if (p == head && tryAcquire(arg)) {
// 2.1 node 获取资源成功,设置 node 为新的头结点
setHead(node);
// 2.2 原头结点的后继结点 next 设置为 null, 便于进行 GC
p.next = null; // help GC
// 2.3 标记已经获取到资源
failed = false;
// 2.4 返回 interrupted 状态
return interrupted;
}
// 3.1 检查并更新无法获取资源的结点的状态
// 3.2 阻塞线程,并检查中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 4. failed = true 且跳出for循环,说明抛出异常,即 p = null, node 为头结点
if (failed)
// 4.1 取消对资源的获取
cancelAcquire(node);
}
}
acquireQueued() 执行流程:
- 获取 node 的前驱结点p,若p为头节点说明p已经获取到资源,封装 node的线程尝试获取资源
- 若前驱结点p == null,说明 node = head,不会执行 if 而是直接跳出
for(; ;)
执行 finally 取消获取资源 - 若前驱结点
p != null && p != head
,则判断是否要阻塞当前线程,以及检查当前线程的中断标识 - 获取未取消获取同步资源的前驱结点的 waitStatus,自旋直到 p.waitStatus = Node.SIGNAL,然后阻塞线程
- 检查node线程的中断标志位,若中断则保留中断标识
说明:从 5
可以回答上述:acquire() ignoring interrupts
的问题:即在获取同步资源的过程中不处理中断,仅仅是保留中断标识
predecessor() 获取结点的前驱结点,若获取的前驱节点为空,方法会抛出 NullPointerException 异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
4.1.3.2 setHead()
执行 setHead() 设置 node 为 head 前提:
① node 的 prev 为 head
② node.tryAcquire() = true
private void setHead(Node node) {
head = node;
// head 为 dummy 结点,其对应的 therad 属性为null
// 这是上述判断 p = null 即 node = head 时取消获取资源的原因
node.thread = null;
node.prev = null;
}
4.1.3.3 shouldParkAfterFailedAcquire()
线程获取资源失败后,判断是否阻塞线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 1. 获取前驱结点的状态, 从而判断是否阻塞线程
int ws = pred.waitStatus;
// 2.1 若 prev 设置为 SIGNAL
if (ws == Node.SIGNAL)
// 2.1.1 prev 已经设置了请求释放的状态,则阻塞当前线程
return true;
// 2.2 waitStatus = CANCELLED > 0 即 prev 取消等待资源[超时或中断]
if (ws > 0) {
// 2.2.1 跳过所有 CANCELLED 的 prev 结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 2.2.2 设置最近一个 waitStatus < 0 的结点为 node 的 prev
pred.next = node;
} else { // 1.3 waitStatus = 0 || waitStatus = -3
// 2.3.1 CAS 设置 prev 的 waitStatus = Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 3. 返回 false, 不阻塞线程,返回 acquireQueued() 进行下一次自旋
return false;
}
4.1.3.4 parkAndCheckInterrupt()
阻塞线程,当线程恢复后判断线程是否中断
private final boolean parkAndCheckInterrupt() {
// 1. park() 使当前线程进入 waiting 状态,可通过 unpark(), interrupt() 唤醒该线程
LockSupport.park(this);
// 2. 返回线程是否被中断,会清除中断标识
return Thread.interrupted();
}
4.1.4 selfInterrupt( )
若 parkAndCheckInterrupt() = true 则 acquireQueue() 设置 interrupted = true 并返回 interrupted 的状态
回到 acquire() 若 获取同步资源失败 tryAcquire(arg) = false && acquireQueued(node, arg) = true
则会执行 selfInterrupt() 方法
acquireQueue(node,arg) 方法中,仅当node获取资源成功后才会返回 interrupted
而 interrupted 状态仅在线程阻塞后执行 parkAndCheckInterrupt()
才会更新
这也对应了 acquire(arg)
再获取到同步资源后才处理中断
// 设置线程中断,并不对中断做出响应
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
总结: acquire() 方法的执行流程
- 先通过
tryAcquire()
尝试获取锁【该方法由子类实现】 - 获取失败,将线程封装成node添加到同步队列,在同步队列中循环调用
tryAcquire()
获取锁 - 循环调用
for(;;)
即自旋,若前驱已经请求释放资源则停止自旋,执行parkAndCheckInterrupt
阻塞线程
问题:线程执行 parkAndCheckInterrupt() 之后,是进入等待队列吗?还是继续在同步队列?
问题:阻塞与等待的区别?
上述 acquireQueued(node, arg)
的流程和synchronized 锁从轻量级锁膨胀为重量级锁的过程
还有验证下 其中的参数 arg 是否用于重入锁
问题:结点设置为 head 之后,其 waitStatus, 结点加入同步队列后的 waitStatus
① 封装结点到Node
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode; // node.nextWaiter = Node.EXCLUSIVE
this.thread = thread;
}
② 判断是否阻塞线程 shouldParkAfterFailedAcquire(pred, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
...
return true;
if (ws > 0) {
...
} else {
....
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
综合① ② ,说明:结点加入同步队列时并没有设置 waitStatus, 也即该值是默认0
,当该结点作为前驱结点,后继结点需要获取前驱结点的 waitStatus时,才将该值设置为 Node.SIGNAL
问题:结点是怎么被设置为head的呢?
// acquireQueued(node, arg)
if (p == head && tryAcquire(arg)) {
setHead(node);
结点的前驱为头结点,并且该结点获取资源成功
而在setHead(node) 的逻辑里,将 head.thread = null 即将该线程释放出来
那么此时 head 的 waitStatus 呢?或许应该从获取到资源后的释放逻辑来看
翻到 4.2.2 unparkSuccessor() 可以看到通过CAS设置head.waitStatus = 0
// ① release(arg)
if (h != null && h.waitStatus != 0)
// 1.2.1 头结点不为空且在同步队列中,则唤醒后继结点,让后继结点竞争资源
unparkSuccessor(h);
// ② unparkSuccessor(h)
if (ws < 0)
// 2.1 CAS 将结点状态修改为正常结点初始化状态
compareAndSetWaitStatus(node, ws, 0);
综上来说,结点 waitStatus 的变化过程为:
- 初始时为默认值 node.waitStatus = 0
- 执行 tryAcquire() 失败到 addWaiter() 该值未变化
- 执行 acquireQueued() 该节点的后继结点在队列中自旋获取同步资源,设置 node.waitStatus = Node.SIGNAL
- 获取到资源后释放资源唤醒后继结点时,设置 node.waitStatus = 0,此时 node = head
问题:线程是如何被唤醒,获取资源的?
4.2 release() 独占模式下释放资源release() 独占模式下释放资源
public final boolean release(int arg) {
//1. 判断 tryRelease() 释放资源成功 ?
if (tryRelease(arg)) {
// 1.1 释放资源成功,获取同步队列头结点
Node h = head;
// 1.2 判断头结点和头节点的 waitStatus [0 正常同步结点的初始化值]
if (h != null && h.waitStatus != 0)
// 1.2.1 头结点不为空且在同步队列中,则唤醒后继结点,让后继结点竞争资源
unparkSuccessor(h);
// 1.3 释放资源成功返回 true, 即使if条件不满足
return true;
}
//2. tryRelease() 释放独占资源失败,返回 false
return false;
}
4.2.1 tryRelease()
tryRelease() 尝试释放同步资源,释放成功返回 true,需要AQS的子类实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
4.2.2 unparkSuccessor()
unparkSuccessor() 唤醒等待队列中的后继结点
private void unparkSuccessor(Node node) {
// 1. ws 结点的状态
int ws = node.waitStatus;
// 2. ws < 0 结点处于 {SIGNAL, CONDITION, PROPAGATE}
if (ws < 0)
// 2.1 CAS 将结点状态修改为正常结点初始化状态
compareAndSetWaitStatus(node, ws, 0);
// 3. 唤醒后继结点 s
Node s = node.next;
// 3.1 若后继s 为空, 或取消获取资源{CANCELLED}
if (s == null || s.waitStatus > 0) {
// 3.1.1 将 s 置为空
s = null;
// 3.1.2 从 tail 向前遍历找到 node 后的第一个待唤醒的结点
for (Node t = tail; t != null && t != node; t = t.prev)
//循环结束条件为 t == null 或者 t = node,由后向前遍历时 s 会被多次赋值,直到找到靠近 node 的非空后继才会停止循环
if (t.waitStatus <= 0)
s = t;
}
// 3.2 后继 s 非空,唤醒 s 对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
作业:总结 unparkSuccessor(node) 方法的执行流程
- release(arg) 方法中获取 head 后,将其作为入参调用 unparkSuccessor(node),node = head
- CAS 设置 head.waitStatus = 0
- 获取 head 的第一个非空且未取消的结点
- 若后继结点取消获取资源,则需要从后向前遍历找到离 head 最近的未取消且不为null的结点
- 若后继为空则不必执行 unpark(thread)
同步队列的头节点为虚节点dummy node,节点的 thread = null,AQS 中维护的同步队列除去头节点以外的部分才是真正等待资源的队列,即使同步队列是空的,包装 thread的Node也会排在同步队列的第二位
作业:同步队列存储示意图
5 AQS 共享模式AQS 独占模式和共享模式的区别:同一时刻能否有多个线程获取同步资源
5.1 acquireShared() 共享模式获取资源acquireShared() 共享模式获取资源,可用于实现 lock
public final void acquireShared(int arg) {
// 1. tryAcquireShared(arg) 获取到共享资源会返回正数
if (tryAcquireShared(arg) < 0)
// 2. 没有获取到资源执行 if中的 doAcquireShared(arg)
doAcquireShared(arg);
}
独占模式:acquire(arg)->tryAcquire(arg)->addWaiter(Node.EXCLUSIVE,arg)->acquireQueued(node)->selfInterrupt()
共享模式:acquireShared(arg)->tryAcquireShared(arg)->doAcquireShared(arg)
tryAcquireShared(arg) 以共享模式尝试获取资源,由子类实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
作业:子类实现 tryAcquireShared(arg) 的流程
① ReentrantLock() 中是否有该方法的实现?
② Semaphore,CountDownLatch,CyclicBarrier,ReentrantReadWriteLock 对该方法的实现的区别
5.1.2 doAcquireShared()doAcquiredShared() 使线程进入同步队列,共享不可中断模式
private void doAcquireShared(int arg) {
// 1. 通过CAS自旋方式入尾,与独占模式的区别:传入参数
// 独占模式:Node.EXCLUSIVE, 共享模式:Node.SHARED
final Node node = addWaiter(Node.SHARED);
// 2. 标记是否成功获取到资源,为 head 节点时 failed = true 取消获取资源
boolean failed = true;
try {
// 3. 标记是否中断
boolean interrupted = false;
// 4. 自旋
for (;;) {
// 4.1 获取node的前继节点 p
final Node p = node.predecessor();
// 4.2 若 p 为 head, 则尝试获取共享资源
if (p == head) {
// 4.2.1 p 为 head 尝试获取共享资源
int r = tryAcquireShared(arg);
// 4.2.2 获取共享资源成功,独占模式是 boolean
if (r >= 0) {
// 4.2.2.1 将 node 设置为 head,检查后继节点是否在共享模式下等待
setHeadAndPropagate(node, r);
// 4.2.2.2 原头节点的next引用置为 null,便于GVM对头节点进行GC
p.next = null; // help GC
// 4.2.2.3 若线程被中断
if (interrupted)
// 4.2.2.3.1 维护中断状态
selfInterrupt();
// 4.2.2.4 标记成功获取资源
failed = false;
// 4.2.2.5 返回,退出函数
return;
}
}
// 4.3 若node的前驱不是头节点或者node获取共享资源失败
if (shouldParkAfterFailedAcquire(p, node) &&
// 4.3.1 检查并更新无法获取资源的节点状态
parkAndCheckInterrupt())
// 4.3.2 阻塞线程,并检查中断状态
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
与独占模式的 acquireQueued(arg) 代码逻辑相同
共享模式与独占模式的不同:
① addWaiter(Node mode) 放到 doAcquireShared(arg) 中执行
② tryAcquireShared(arg) 返回int值,semphore, countDownLatch, ReentrantReadWriteLock 均重写了该方法,用于针对执行情况做相应的处理
③ setHeadAndPropagate(node, r) 可传播模式设置 head 结点,共享模式下不需要等待释放资源再唤醒节点,因此若后继结点在共享模式下等待,可以接续设置 head结点
④ doAcquireShared(arg) 不返回中断状态,而独占模式下是获取到资源后判断中断表示,决定是否中断
问题:独占模式与共享模式下结点加入同步队列是否有区别?
目前看来好像没区别,都是调用addWaiter(mode)
,区别在入参,执行逻辑是相同的
// addWaiter(mode)
Node node = new Node(Thread.currentThread(), mode);
// class Node
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
而 mode 用于设置 node.nextWiter
共享模式中,获取资源成功的线程调用 setHeadAndPropagate(node, r)
设置队列的头节点,并检查后继节点是否是共享模式
private void setHeadAndPropagate(Node node, int propagate) {
// 1. 获取队列的头节点
Node h = head; // Record old head for check below
// 2. 设置当前节点为头节点
setHead(node);
// 3. 唤醒后继结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 3.1 获取当前结点 node 的后继结点 s
Node s = node.next;
// 3.2 当前节点node 的后继 s = null 或者 s 是共享模式
if (s == null || s.isShared())
// 3.3.1 唤醒后继结点
doReleaseShared();
}
}
唤醒后继结点的三种条件:
① propagate > 0 后续结点需要被唤醒,propagate = r 获取共享资源成功 r > 0
② h == null || h.waitStatus < 0
此处h = old head
头结点为空或 waitStatus < 0
非取消状态
③ h = head
, 执行了setHead(node)
此处 h = new head
当前结点为空或 waitStatus < 0
非取消获取共享资源
思考:上述 无论是 old head ,new head, 还是后继 结点若为null 均执行唤醒后继结点的原因?
共享模式下:当前线程拿到资源后不必等到释放资源再通知后继结点,为null的时候不占有资源,自然可以通知后继结点
5.2 releaseShared() 共享模式释放资源releaseShared() 共享模式释放资源
public final boolean releaseShared(int arg) {
// 1. 尝试释放共享资源
if (tryReleaseShared(arg)) {
// 2. 尝试释放资源成功后,调用 doReleaseShared() 唤醒后继结点
doReleaseShared();
return true;
}
// 2. 尝试释放共享资源失败返回 false
return false;
}
5.2.1 tryReleaseShared()
tryReleaseShared() 由子类实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
5.2.2 doReleaseShared()
tryReleaseShared() 执行成功,执行 doReleaseShared() 唤醒后继结点
private void doReleaseShared() {
// 自旋释放后继结点
for (;;) {
// 1. 自旋,动态获取队列头节点
Node h = head;
// 2. 头结点非空且不等于尾结点[队列中至少有两个结点]
if (h != null && h != tail) {
// 2.1 获取头节点的 waitStatus -- ws
int ws = h.waitStatus;
// 2.2 头节点状态为 SIGNAL
if (ws == Node.SIGNAL) {
// 2.2.1 CAS 设置头结点状态为 0 ,若失败则继续进行下次循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 2.2.2 CAS 成功,唤醒后继结点
unparkSuccessor(h);
}
// 2.3 头结点状态为 0 [正常结点初始化值]
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 2.3.1 在头结点状态为 0 的基础上设置头结点状态为 Node.PROPAGATE [可向后传播唤醒]
continue; // loop on failed CAS
// 2.3.2 CAS 失败继续 进行下次循环
}
// 3. 自旋跳出条件,若head不变就跳出自旋,若head一直变化就一直自旋
if (h == head) // loop if head changed
break;
}
}
问题:共享模式下 node.waitStatus 的状态机
- 初始时为默认值 node.waitStatus = 0
- 执行 tryAcquireShared() 失败到执行 doAcquireShared() 该值未变化
- 执行 shouldParkAfterFailedAcquire(pred, node) CAS设置 node.waitStatus = Node.SIGNAL
- 释放资源时CAS 设置 head.waitStatus = 0
- 或者未执行
3
即未阻塞node 时,设置 node.waitStatus = Node.PROPAGATE
问题:共享模式下执行 shouldParkAfterFailedAcquire(pred, node) 会执行吗?或者说 共享模式下 node.waitStatus会否设置为 Node.SIGNAL
当前驱结点不是 head,或者执行 tryAcquireShared(arg)失败的时候会执行
问题:doReleaseShared() 的处理流程
- 获取队列头结点,若头节点不为空且队列不为空则获取头结点的 waitStatus
- 若头节点 h.waitStatus = Node.SIGNAL 说明后继结点执行了
shouldParkAfterFailedAcquire(pred, node)
则CAS 修改 h.waitStatus = 0 并解除后继结点的阻塞状态 【注意:执行完之后并没有退出循环】 - 若头结点 h.waitStatus = 0,说明执行的是
setHeadAndPropagate(node, r)
中的doReleaseShared()
设置 h.waitStatus = Node.PROPAGATE 并继续循环 - 循环退出条件 h == head
问题:上述流程中好像并没有修改 head, 所以会直接退出 doReleaseShared()
吗?那为什么不直接在解除后继结点的阻塞时就退出循环呢?
思考这个问题,一定要放在多线程的角度下,
① 多个线程执行 acquireShared()
,如果尝试获取资源失败都会进入到doAcquireShared(arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
② 一旦尝试获取资源失败都会调用 addWaiter(Node.SHARED)
到同步队列中排队
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
③ 加入到同步队列中后所有结点均自旋获取同步资源,只有前驱结点为 head 的结点才能执行tryAcquireShared(arg)
获取同步资源
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
④ 若前驱为head的结点执行tryAcquireShared(arg)<0
,即失败
则阻塞当前结点的后继结点shouldParkAfterFailedAcquire(p, node)
,而该结点继续自旋执行 tryAcquireShared(arg)
直到获取资源成功
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
....
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
⑤ 若前驱为head的结点执行 tryAcquireShared(arg)
成功,即获得同步资源
则执行setHeadAndPropagate(node, r)
// doAcquireShared(arg)
if (r >= 0) {
setHeadAndPropagate(node, r);
// setHeadAndPropagate(Node node, int propagate)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
自此除了 mode 不同外,基本和独占模式的处理流程相同
那么不同在哪里呢?
回到 5.1.2 节可以看到,区别在于:
独占模式下执行:setHead(node)
共享模式下执行:setHeadAndPropagate(node, r)
相同点都是:设置成功获取资源的结点为head结点,这样后继结点比如node2
由于前驱为node1=head
就可以执行tryAcquire(arg)
或tryAcquireShared(arg)
获取同步资源了
但是 setHeadAndPropagate(node, r)
又执行了释放资源的 *** 作doReleaseShared()
假设当前已获取同步资源的结点是node1
,则node2
开始执行流程3-5
,同样会执行doReleaseShared()
也同样会引起下一个结点node3
执行 doReleaseShared()
如此:必然需要判断 head
是否变化,若 head
一直动态变化,则继续自旋解除阻塞的后继结点,或者在未阻塞模式下设置 Node.PROPAGATE,确保该传播模式继续进行。
后记:后面本来还有 条件模式,但是内容太多了,而且还没有理解,所以放到下一章总结
真的感谢周冠亚老师写这本书,还有CSDN写博客的平台,一遍是不行的,还有很多没理解,等到把相关的工具类也总结后,再来2-3遍,纠正理解不当之处,同时进一步发掘问题
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)