Java Interview In Action - AQS

Java Interview In Action - AQS,第1张

继续写博客吧,不然时间都被荒废了,这次一定要拿下 AQS

博客内容来源于 周冠亚老师的 Java 面试一战到底,主要是根据书中的内容对JUC 源码梳理

梳理的时候是将源码复制,翻译英文注释,然后对照书上纠正翻译问题和理解问题,基本上是一篇草稿blog

主要内容为:共享模式/独占模式 获取/释放 同步资源

穿插一些自问自答,以及待解答的小问题,推荐阅读作者原作

AbstractQueuedSynchronized 1 AbstractQueuedSynchronizer

独占式获取锁,
aos 是 aqs 的父类
关键词:blocking locks, semaphores, events

该类中的方法:

// 独占模式下,同步器的当前所有者
private transient Thread exclusiveOwnerThread;
// 设置独占模式同步器的当前所有者
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
// 获取独占模式同步器的当前所有者
protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}
2 AQS 内部类

AQS 中有两个内部类:Node,ConditionObject

Node :静态内部类,用于表示队列的结点信息

ConditionObject :非静态内部类,用于实现条件阻塞

2.1 Node

Node 属性:waiterstatus, prev, next, thread, nextWaiter
通过Node 可以实现两种队列:

  1. 通过 prev 和 next 属性实现的 CLH 队列【同步队列,双向队列】
  2. nextWaiter 属性实现的在 Condition 条件上等待线程队列【条件队列,单向队列】

问题:为什么 同步队列是双向的,而 等待队列是单向的?

同步队列是由前驱结点解除BLOCKING 的,而等待队列是由其它线程 signal/signalAll() 解除等待

问题:两者之间的关联?

等待队列中的结点一定获得过同步资源,当被signal/signalAll() 后加入到同步队列
【上述待整理 条件模式 后验证是否正确,还有是否与公平模式和非公平模式有关】

static final class Node {
        // SHARED 标识结点当前在共享模式下
        static final Node SHARED = new Node();
        // EXCLUSIVE 标识结点当前在独占模式下
        static final Node EXCLUSIVE = null;
        // CANCELLED 线程取消获取同步资源
        static final int CANCELLED =  1;
        // SIGNAL 当前线程的后继结点对应的线程需要被唤醒
        static final int SIGNAL    = -1;
        // CONDITION 结点当前处于条件队列中
        static final int CONDITION = -2;
        // PROPAGATE 共享同步状态可以进行传播
        static final int PROPAGATE = -3;
        // waitStatus {CANCELLED, SIGNAL, CONDITION, PROPAGATE}
        volatile int waitStatus;
        
        // prev 当前结点的前驱结点,用于检查 waitStatus
        volatile Node prev;
		// 指向当前结点在释放时唤醒的后继结点
        volatile Node next;

        // 进入队列时的线程对象
        volatile Thread thread;

        // 存储条件队列的后继结点
        Node nextWaiter;

        // 当条件队列中的结点在共享模式下等待时返回 true
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 返回当前结点的前驱结点
        final Node predecessor() throws NullPointerException {
        	// 1. 得到前驱结点
            Node p = prev;
            // 2. 若 prev 为空则抛出空指针异常
            if (p == null)
                throw new NullPointerException();
                // 3. prev 不为空则返回
            else
                return p;
        }
		// 无参构造用于初始化 head,或设置 SHARED 标识
        Node() {    // Used to establish initial head or SHARED marker
        }
		// thread, mode 用于 addWaiter()
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
		// thread, waitStatus 用于 Condition 条件队列
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
2.2 ConditionObject

ConditionObject 实现了 Condition 接口

条件模式留到下一张博客

3 AQS 属性

head,tail, state

	// head 队列头结点,dummy node 不代表任何线程,setHead() 修改
    private transient volatile Node head;
    // 队列尾结点,enq() 修改
    private transient volatile Node tail;

    // 同步状态
    private volatile int state;
	// 返回 state 状态
    protected final int getState() {
        return state;
    }
	// 设置 state 状态
    protected final void setState(int newState) {
        state = newState;
    }
4 AQS 独占模式

同步资源的状态:独占,共享

独占模式的功能:获取同步资源,释放同步资源

4.1 acquire() 独占式获取同步资源
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

实现流程:

  1. 调用 tryAcquire() 尝试获取同步资源,一般在AQS子类实现
  2. 调用 addWaiter() 将当前调用 acquire() 方法的线程入队
  3. 调用 acquireQueued() 在等待队列中获取同步资源

问题1:验证上面 3. 等待队列中获取同步资源是否准确?

问题2:线程中断是如何处理的?注释部分 – ignoring interrupts

4.1.1 tryAcquire( )

tryAcquire() 具体逻辑需要 AQS 子类实现

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
4.1.2 addWaiter( )

addWaiter() 将未获取到同步资源使用权的线程加入同步队列中

// mode Node.EXCLUSIVE 独占, Node.SHARED 共享
private Node addWaiter(Node mode) {
	// 1. 用当前线程创建一个Node 结点
    Node node = new Node(Thread.currentThread(), mode);
    // 2. 获取队列的尾节点
    Node pred = tail;
    // 3.1 若当前队列非空
    if (pred != null) {
    	// 3.1.1 node 结点的 prev 引用指向 tail
        node.prev = pred;
        // 3.1.2 CAS  *** 作设置node 为新的 tail,若失败则执行 enq()
        if (compareAndSetTail(pred, node)) {
        	//3.1.3 CAS 成功,将 prev [old tail] 的 next 引用指向 node
            pred.next = node;
            // 3.1.4 返回 new tail
            return node;
        }
    }
    // 3.2 当前队列为空,或队列非空时通过CAS入队尾失败[多线程竞争入队]
    // 通过 enq() 方法自旋
    enq(node);
    // 4. 返回入尾成功的 new tail
    return node;
}

注意:mode {Node.EXCLUSIVE, Node.SHARED} 即独占模式和共享模式都是通过 addWaiter() 添加结点到同步队列的

addWaiter(Node mode) 实现流程:

  1. 将当前线程封装到 Node 中,通过 Thread.currentThread() 获取当前线程
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

问题:为什么将Node.nextWaiter 设置为 mode,即 Node.EXCLUSIVE

共享模式:作为判断后继结点是否是传播模式

if (s == null || s.isShared())
	doReleaseShared();

final boolean isShared() {
    return nextWaiter == SHARED;
}
  1. 队列非空使用CAS将Node 添加到队尾
  2. 队尾为空使用 enq() 添加Node【多了初始化队列的处理】
4.1.2.1 compareAndSetTail(tail, node)
// cas (MemoryValue, OldValue, ModifyValue) 
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

CAS 添加结点入尾流程:

  1. 通过 this, tailOffset 获取 MemoryValue
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
  2. 机制 MemoryValue == OldValue ? OldValue = ModifyValue : cas
4.1.2.2 enq()

① 若队列为空,CAS 设置头结点【会执行 enq() 说明队列为空,会执行 if 分支】

② CAS 设置尾结点

private Node enq(final Node node) {
	//1. 若if(cas)失败则继续执行for(;;)
    for (;;) {
    	// 2. 获取尾结点
        Node t = tail;
        // 3.1 队列为空
        if (t == null) { // Must initialize
        	// 3.1.1 CAS 创建头结点
            if (compareAndSetHead(new Node()))
            	// 3.1.2 CAS 成功,队列中仅有一个结点,头结点 = 尾结点
                tail = head;
        } else {
        	// 3.2 队列非空
            node.prev = t;
            // 3.2.1 CAS 设置 node = new tail
            if (compareAndSetTail(t, node)) {
            	// 3.2.2 old tail 的 next 引用指向 node
                t.next = node;
                // 自旋结束,返回原尾结点
                return t;
            }
        }
    }
}
4.1.3 acquireQueued( )

线程成功进入同步队列后,通过 acquireQueued() 方法在同步队列中等待同步资源释放

// 结点加入队列后,尝试在同步队列中自旋获得资源

final boolean acquireQueued(final Node node, int arg) {
	// 标记是否成功获取到资源, 置为 false 标识获取资源成功,finally 中 true 取消获取
    boolean failed = true;
    try {
    	// 标记线程是否被中断
        boolean interrupted = false;
        for (;;) {
        	// 1. 获取 node 的前驱结点
            final Node p = node.predecessor();
            // 2. node 的前驱是 head, 则head释放资源后,node尝试获取资源
            if (p == head && tryAcquire(arg)) {
            	// 2.1 node 获取资源成功,设置 node 为新的头结点
                setHead(node);
                // 2.2 原头结点的后继结点 next 设置为 null, 便于进行 GC
                p.next = null; // help GC
                // 2.3 标记已经获取到资源
                failed = false;
                // 2.4 返回 interrupted 状态
                return interrupted;
            }
            // 3.1 检查并更新无法获取资源的结点的状态
            // 3.2 阻塞线程,并检查中断状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
    	// 4. failed = true 且跳出for循环,说明抛出异常,即 p = null, node 为头结点
        if (failed)
        	// 4.1 取消对资源的获取
            cancelAcquire(node);
    }
}

acquireQueued() 执行流程:

  1. 获取 node 的前驱结点p,若p为头节点说明p已经获取到资源,封装 node的线程尝试获取资源
  2. 若前驱结点p == null,说明 node = head,不会执行 if 而是直接跳出 for(; ;)执行 finally 取消获取资源
  3. 若前驱结点p != null && p != head,则判断是否要阻塞当前线程,以及检查当前线程的中断标识
  4. 获取未取消获取同步资源的前驱结点的 waitStatus,自旋直到 p.waitStatus = Node.SIGNAL,然后阻塞线程
  5. 检查node线程的中断标志位,若中断则保留中断标识

说明:从 5可以回答上述:acquire() ignoring interrupts的问题:即在获取同步资源的过程中不处理中断,仅仅是保留中断标识

4.1.3.1 predecessor()

predecessor() 获取结点的前驱结点,若获取的前驱节点为空,方法会抛出 NullPointerException 异常

final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
4.1.3.2 setHead()

执行 setHead() 设置 node 为 head 前提:

① node 的 prev 为 head

② node.tryAcquire() = true

private void setHead(Node node) {
    head = node;
    // head 为 dummy 结点,其对应的 therad 属性为null
    // 这是上述判断 p = null 即 node = head 时取消获取资源的原因
    node.thread = null;
    node.prev = null;
}
4.1.3.3 shouldParkAfterFailedAcquire()

线程获取资源失败后,判断是否阻塞线程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 1. 获取前驱结点的状态, 从而判断是否阻塞线程
    int ws = pred.waitStatus;
    // 2.1 若 prev 设置为 SIGNAL 
    if (ws == Node.SIGNAL)
        // 2.1.1 prev 已经设置了请求释放的状态,则阻塞当前线程
        return true;
    // 2.2 waitStatus = CANCELLED > 0 即 prev 取消等待资源[超时或中断]
    if (ws > 0) {
        // 2.2.1 跳过所有 CANCELLED 的 prev 结点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 2.2.2 设置最近一个 waitStatus < 0 的结点为 node 的 prev
        pred.next = node;
    } else {  // 1.3 waitStatus = 0 || waitStatus = -3
        // 2.3.1 CAS 设置 prev 的 waitStatus = Node.SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 3. 返回 false, 不阻塞线程,返回 acquireQueued() 进行下一次自旋
    return false;
}
4.1.3.4 parkAndCheckInterrupt()

阻塞线程,当线程恢复后判断线程是否中断

private final boolean parkAndCheckInterrupt() {
	// 1. park() 使当前线程进入 waiting 状态,可通过 unpark(), interrupt() 唤醒该线程
    LockSupport.park(this);
    // 2. 返回线程是否被中断,会清除中断标识
    return Thread.interrupted();
}
4.1.4 selfInterrupt( )

若 parkAndCheckInterrupt() = true 则 acquireQueue() 设置 interrupted = true 并返回 interrupted 的状态

回到 acquire() 若 获取同步资源失败 tryAcquire(arg) = false && acquireQueued(node, arg) = true则会执行 selfInterrupt() 方法

acquireQueue(node,arg) 方法中,仅当node获取资源成功后才会返回 interrupted

而 interrupted 状态仅在线程阻塞后执行 parkAndCheckInterrupt()才会更新

这也对应了 acquire(arg)再获取到同步资源后才处理中断

// 设置线程中断,并不对中断做出响应
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

总结: acquire() 方法的执行流程

  1. 先通过 tryAcquire()尝试获取锁【该方法由子类实现】
  2. 获取失败,将线程封装成node添加到同步队列,在同步队列中循环调用 tryAcquire()获取锁
  3. 循环调用for(;;)即自旋,若前驱已经请求释放资源则停止自旋,执行parkAndCheckInterrupt阻塞线程

问题:线程执行 parkAndCheckInterrupt() 之后,是进入等待队列吗?还是继续在同步队列?

问题:阻塞与等待的区别?

上述 acquireQueued(node, arg)的流程和synchronized 锁从轻量级锁膨胀为重量级锁的过程
还有验证下 其中的参数 arg 是否用于重入锁

问题:结点设置为 head 之后,其 waitStatus, 结点加入同步队列后的 waitStatus

① 封装结点到Node

Node(Thread thread, Node mode) {     // Used by addWaiter
   this.nextWaiter = mode; // node.nextWaiter = Node.EXCLUSIVE
   this.thread = thread;
}

② 判断是否阻塞线程 shouldParkAfterFailedAcquire(pred, node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
			...
            return true;
        if (ws > 0) {
			...
        } else {
			....
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

综合① ② ,说明:结点加入同步队列时并没有设置 waitStatus, 也即该值是默认0,当该结点作为前驱结点,后继结点需要获取前驱结点的 waitStatus时,才将该值设置为 Node.SIGNAL

问题:结点是怎么被设置为head的呢?

// acquireQueued(node, arg)
if (p == head && tryAcquire(arg)) {
                    setHead(node);

结点的前驱为头结点,并且该结点获取资源成功

而在setHead(node) 的逻辑里,将 head.thread = null 即将该线程释放出来

那么此时 head 的 waitStatus 呢?或许应该从获取到资源后的释放逻辑来看

翻到 4.2.2 unparkSuccessor() 可以看到通过CAS设置head.waitStatus = 0

// ① release(arg)
if (h != null && h.waitStatus != 0)
        	// 1.2.1 头结点不为空且在同步队列中,则唤醒后继结点,让后继结点竞争资源
            unparkSuccessor(h);
// ② unparkSuccessor(h)
if (ws < 0)
    	// 2.1 CAS 将结点状态修改为正常结点初始化状态
        compareAndSetWaitStatus(node, ws, 0);

综上来说,结点 waitStatus 的变化过程为:

  1. 初始时为默认值 node.waitStatus = 0
  2. 执行 tryAcquire() 失败到 addWaiter() 该值未变化
  3. 执行 acquireQueued() 该节点的后继结点在队列中自旋获取同步资源,设置 node.waitStatus = Node.SIGNAL
  4. 获取到资源后释放资源唤醒后继结点时,设置 node.waitStatus = 0,此时 node = head

问题:线程是如何被唤醒,获取资源的?

4.2 release() 独占模式下释放资源

release() 独占模式下释放资源

public final boolean release(int arg) {
	//1. 判断 tryRelease() 释放资源成功 ?
    if (tryRelease(arg)) {
    	// 1.1 释放资源成功,获取同步队列头结点
        Node h = head;
        // 1.2 判断头结点和头节点的 waitStatus [0 正常同步结点的初始化值]
        if (h != null && h.waitStatus != 0)
        	// 1.2.1 头结点不为空且在同步队列中,则唤醒后继结点,让后继结点竞争资源
            unparkSuccessor(h);
            // 1.3 释放资源成功返回 true, 即使if条件不满足
        return true;
    }
    //2. tryRelease() 释放独占资源失败,返回 false
    return false;
}
4.2.1 tryRelease()

tryRelease() 尝试释放同步资源,释放成功返回 true,需要AQS的子类实现

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
4.2.2 unparkSuccessor()

unparkSuccessor() 唤醒等待队列中的后继结点

private void unparkSuccessor(Node node) {
    // 1. ws 结点的状态
    int ws = node.waitStatus;
    // 2. ws < 0 结点处于 {SIGNAL, CONDITION, PROPAGATE}
    if (ws < 0)
    	// 2.1 CAS 将结点状态修改为正常结点初始化状态
        compareAndSetWaitStatus(node, ws, 0);
    // 3. 唤醒后继结点 s    
    Node s = node.next;
    // 3.1 若后继s 为空, 或取消获取资源{CANCELLED}
    if (s == null || s.waitStatus > 0) {
    	// 3.1.1 将 s 置为空
        s = null;
        // 3.1.2 从 tail 向前遍历找到 node 后的第一个待唤醒的结点
        for (Node t = tail; t != null && t != node; t = t.prev)
        	//循环结束条件为 t == null 或者 t = node,由后向前遍历时 s 会被多次赋值,直到找到靠近 node 的非空后继才会停止循环
            if (t.waitStatus <= 0)
                s = t;
    }
    // 3.2 后继 s 非空,唤醒 s 对应的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

作业:总结 unparkSuccessor(node) 方法的执行流程

  1. release(arg) 方法中获取 head 后,将其作为入参调用 unparkSuccessor(node),node = head
  2. CAS 设置 head.waitStatus = 0
  3. 获取 head 的第一个非空且未取消的结点
  4. 若后继结点取消获取资源,则需要从后向前遍历找到离 head 最近的未取消且不为null的结点
  5. 若后继为空则不必执行 unpark(thread)

同步队列的头节点为虚节点dummy node,节点的 thread = null,AQS 中维护的同步队列除去头节点以外的部分才是真正等待资源的队列,即使同步队列是空的,包装 thread的Node也会排在同步队列的第二位

作业:同步队列存储示意图

5 AQS 共享模式

AQS 独占模式和共享模式的区别:同一时刻能否有多个线程获取同步资源

5.1 acquireShared() 共享模式获取资源

acquireShared() 共享模式获取资源,可用于实现 lock

public final void acquireShared(int arg) {
	// 1. tryAcquireShared(arg) 获取到共享资源会返回正数
    if (tryAcquireShared(arg) < 0)
    	// 2. 没有获取到资源执行 if中的 doAcquireShared(arg)
        doAcquireShared(arg);
}

独占模式:acquire(arg)->tryAcquire(arg)->addWaiter(Node.EXCLUSIVE,arg)->acquireQueued(node)->selfInterrupt()

共享模式:acquireShared(arg)->tryAcquireShared(arg)->doAcquireShared(arg)

5.1.1 tryAcquireShared(arg)

tryAcquireShared(arg) 以共享模式尝试获取资源,由子类实现

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

作业:子类实现 tryAcquireShared(arg) 的流程

① ReentrantLock() 中是否有该方法的实现?

② Semaphore,CountDownLatch,CyclicBarrier,ReentrantReadWriteLock 对该方法的实现的区别

5.1.2 doAcquireShared()

doAcquiredShared() 使线程进入同步队列,共享不可中断模式

private void doAcquireShared(int arg) {
		// 1. 通过CAS自旋方式入尾,与独占模式的区别:传入参数 
		// 独占模式:Node.EXCLUSIVE, 共享模式:Node.SHARED
        final Node node = addWaiter(Node.SHARED);
        // 2. 标记是否成功获取到资源,为 head 节点时 failed = true 取消获取资源
        boolean failed = true;
        try {
        	// 3. 标记是否中断
            boolean interrupted = false;
            // 4. 自旋
            for (;;) {
            	// 4.1 获取node的前继节点 p
                final Node p = node.predecessor();
                // 4.2 若 p 为 head, 则尝试获取共享资源
                if (p == head) {
                	// 4.2.1 p 为 head 尝试获取共享资源
                    int r = tryAcquireShared(arg);
                    // 4.2.2 获取共享资源成功,独占模式是 boolean
                    if (r >= 0) {
                    	// 4.2.2.1 将 node 设置为 head,检查后继节点是否在共享模式下等待
                        setHeadAndPropagate(node, r);
                        // 4.2.2.2 原头节点的next引用置为 null,便于GVM对头节点进行GC
                        p.next = null; // help GC
                        // 4.2.2.3 若线程被中断
                        if (interrupted)
                        	// 4.2.2.3.1 维护中断状态
                            selfInterrupt();
                        // 4.2.2.4 标记成功获取资源
                        failed = false;
                        // 4.2.2.5 返回,退出函数
                        return;
                    }
                }
                // 4.3 若node的前驱不是头节点或者node获取共享资源失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                	// 4.3.1 检查并更新无法获取资源的节点状态
                    parkAndCheckInterrupt())
                    // 4.3.2 阻塞线程,并检查中断状态
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

与独占模式的 acquireQueued(arg) 代码逻辑相同

共享模式与独占模式的不同:

① addWaiter(Node mode) 放到 doAcquireShared(arg) 中执行

② tryAcquireShared(arg) 返回int值,semphore, countDownLatch, ReentrantReadWriteLock 均重写了该方法,用于针对执行情况做相应的处理

③ setHeadAndPropagate(node, r) 可传播模式设置 head 结点,共享模式下不需要等待释放资源再唤醒节点,因此若后继结点在共享模式下等待,可以接续设置 head结点

④ doAcquireShared(arg) 不返回中断状态,而独占模式下是获取到资源后判断中断表示,决定是否中断

问题:独占模式与共享模式下结点加入同步队列是否有区别?
目前看来好像没区别,都是调用addWaiter(mode),区别在入参,执行逻辑是相同的

// addWaiter(mode)
Node node = new Node(Thread.currentThread(), mode);
// class Node
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

而 mode 用于设置 node.nextWiter

5.1.3 setHeadAndPropagate(node, r)

共享模式中,获取资源成功的线程调用 setHeadAndPropagate(node, r)

设置队列的头节点,并检查后继节点是否是共享模式

private void setHeadAndPropagate(Node node, int propagate) {
		// 1. 获取队列的头节点
        Node h = head; // Record old head for check below
        // 2. 设置当前节点为头节点
        setHead(node);
        // 3. 唤醒后继结点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
			// 3.1 获取当前结点 node 的后继结点 s
            Node s = node.next;
            // 3.2 当前节点node 的后继 s = null 或者 s 是共享模式
            if (s == null || s.isShared())
            	// 3.3.1 唤醒后继结点
                doReleaseShared();
        }
    }

唤醒后继结点的三种条件:

① propagate > 0 后续结点需要被唤醒,propagate = r 获取共享资源成功 r > 0

h == null || h.waitStatus < 0此处h = old head头结点为空或 waitStatus < 0 非取消状态

h = head, 执行了setHead(node)此处 h = new head当前结点为空或 waitStatus < 0非取消获取共享资源

思考:上述 无论是 old head ,new head, 还是后继 结点若为null 均执行唤醒后继结点的原因?

共享模式下:当前线程拿到资源后不必等到释放资源再通知后继结点,为null的时候不占有资源,自然可以通知后继结点

5.2 releaseShared() 共享模式释放资源

releaseShared() 共享模式释放资源

public final boolean releaseShared(int arg) {
	// 1. 尝试释放共享资源
    if (tryReleaseShared(arg)) {
    	// 2. 尝试释放资源成功后,调用 doReleaseShared() 唤醒后继结点
        doReleaseShared();
        return true;
    }
    // 2. 尝试释放共享资源失败返回 false
    return false;
}
5.2.1 tryReleaseShared()

tryReleaseShared() 由子类实现

protected boolean tryReleaseShared(int arg) {
   throw new UnsupportedOperationException();
}
5.2.2 doReleaseShared()

tryReleaseShared() 执行成功,执行 doReleaseShared() 唤醒后继结点

private void doReleaseShared() {
    // 自旋释放后继结点
    for (;;) {
    	// 1. 自旋,动态获取队列头节点
        Node h = head;
        // 2. 头结点非空且不等于尾结点[队列中至少有两个结点]
        if (h != null && h != tail) {
        	// 2.1 获取头节点的 waitStatus -- ws
            int ws = h.waitStatus;
            // 2.2 头节点状态为 SIGNAL
            if (ws == Node.SIGNAL) {
            	// 2.2.1 CAS 设置头结点状态为 0 ,若失败则继续进行下次循环
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
               	// 2.2.2 CAS 成功,唤醒后继结点
                unparkSuccessor(h);
            }
            // 2.3 头结点状态为 0 [正常结点初始化值]
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                     // 2.3.1 在头结点状态为 0 的基础上设置头结点状态为 Node.PROPAGATE [可向后传播唤醒]
                continue;                // loop on failed CAS
                // 2.3.2 CAS 失败继续 进行下次循环
        }
        // 3. 自旋跳出条件,若head不变就跳出自旋,若head一直变化就一直自旋
        if (h == head)                   // loop if head changed
            break;
    }
}

问题:共享模式下 node.waitStatus 的状态机

  1. 初始时为默认值 node.waitStatus = 0
  2. 执行 tryAcquireShared() 失败到执行 doAcquireShared() 该值未变化
  3. 执行 shouldParkAfterFailedAcquire(pred, node) CAS设置 node.waitStatus = Node.SIGNAL
  4. 释放资源时CAS 设置 head.waitStatus = 0
  5. 或者未执行3即未阻塞node 时,设置 node.waitStatus = Node.PROPAGATE

问题:共享模式下执行 shouldParkAfterFailedAcquire(pred, node) 会执行吗?或者说 共享模式下 node.waitStatus会否设置为 Node.SIGNAL

当前驱结点不是 head,或者执行 tryAcquireShared(arg)失败的时候会执行

问题:doReleaseShared() 的处理流程

  1. 获取队列头结点,若头节点不为空且队列不为空则获取头结点的 waitStatus
  2. 若头节点 h.waitStatus = Node.SIGNAL 说明后继结点执行了 shouldParkAfterFailedAcquire(pred, node)则CAS 修改 h.waitStatus = 0 并解除后继结点的阻塞状态 【注意:执行完之后并没有退出循环】
  3. 若头结点 h.waitStatus = 0,说明执行的是 setHeadAndPropagate(node, r)中的 doReleaseShared()
    设置 h.waitStatus = Node.PROPAGATE 并继续循环
  4. 循环退出条件 h == head

问题:上述流程中好像并没有修改 head, 所以会直接退出 doReleaseShared()吗?那为什么不直接在解除后继结点的阻塞时就退出循环呢?

思考这个问题,一定要放在多线程的角度下,

① 多个线程执行 acquireShared(),如果尝试获取资源失败都会进入到doAcquireShared(arg)

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

② 一旦尝试获取资源失败都会调用 addWaiter(Node.SHARED)到同步队列中排队

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);


③ 加入到同步队列中后所有结点均自旋获取同步资源,只有前驱结点为 head 的结点才能执行tryAcquireShared(arg)获取同步资源

for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
        int r = tryAcquireShared(arg);

④ 若前驱为head的结点执行tryAcquireShared(arg)<0,即失败

则阻塞当前结点的后继结点shouldParkAfterFailedAcquire(p, node),而该结点继续自旋执行 tryAcquireShared(arg)直到获取资源成功

for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
        ....
            return;
        }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
}

⑤ 若前驱为head的结点执行 tryAcquireShared(arg)成功,即获得同步资源

则执行setHeadAndPropagate(node, r)

// doAcquireShared(arg)
if (r >= 0) {
   setHeadAndPropagate(node, r);
// setHeadAndPropagate(Node node, int propagate) 
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

自此除了 mode 不同外,基本和独占模式的处理流程相同

那么不同在哪里呢?

回到 5.1.2 节可以看到,区别在于:

独占模式下执行:setHead(node)

共享模式下执行:setHeadAndPropagate(node, r)

相同点都是:设置成功获取资源的结点为head结点,这样后继结点比如node2由于前驱为node1=head就可以执行tryAcquire(arg)tryAcquireShared(arg)获取同步资源了

但是 setHeadAndPropagate(node, r)又执行了释放资源的 *** 作doReleaseShared()

假设当前已获取同步资源的结点是node1,则node2开始执行流程3-5,同样会执行doReleaseShared()

也同样会引起下一个结点node3执行 doReleaseShared()

如此:必然需要判断 head是否变化,若 head一直动态变化,则继续自旋解除阻塞的后继结点,或者在未阻塞模式下设置 Node.PROPAGATE,确保该传播模式继续进行。

后记:后面本来还有 条件模式,但是内容太多了,而且还没有理解,所以放到下一章总结

真的感谢周冠亚老师写这本书,还有CSDN写博客的平台,一遍是不行的,还有很多没理解,等到把相关的工具类也总结后,再来2-3遍,纠正理解不当之处,同时进一步发掘问题

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/923330.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存