JDK8 AbstractQueuedSynchronizer(AQS) 源码 之 独占模式与共享模式

JDK8 AbstractQueuedSynchronizer(AQS) 源码 之 独占模式与共享模式,第1张

目录
  • 一、前言
  • 二、带着问题看源码
    • 2.1 源码里独占模式和共享模式字段对应的值是什么,这样设计带来的影响是什么
    • 2.2 Node.waitStatus在AQS中有哪些状态,代表意义,设置或变更时机是什么
    • 2.3 acquire/acquireInterruptibly/tryAcquireNanos三者的区别是什么
  • 三、public方法
    • 3.1 独占模式
      • 3.1.1 简略流程图
        • 3.1.1.1 acquire方法
        • 3.1.1.2 acquireInterruptibly方法
        • 3.1.1.3 tryAcquireNanos方法
        • 3.1.1.4 release方法
      • 3.1.2 方法列表
    • 3.2 共享模式
      • 3.2.1 简略流程图
        • 3.2.1.1 acquireShared
        • 3.2.1.2 releaseShared
      • 3.2.2 共享模式用到,独占模式没用到的方法列表
  • 四、独占模式源码分析
    • 4.1 acquire 可以被外部类调用的public方法
    • 4.2 tryAcquire 需要被子类重写(这里以ReentrantLock为例)
    • 4.3 addWaiter 将当前线程包装成节点并入队
    • 4.4 enq 初始化头结点并将当前节点入队
    • 4.5 acquireQueued 线程在当前方法循环阻塞、唤醒、尝试获取资源、失败阻塞,成功退出的过程
    • 4.6 setHead 设置头结点,头结点占有线程
    • 4.7 shouldParkAfterFailedAcquire 判断获取资源失败后是否应该获取资源。主要工作:修改前驱节点的状态为SIGNAL,清除队列中被取消的节点(waitStatus为CANCELLED)
    • 4.8 parkAndCheckInterrupt 阻塞当前线程,当其唤醒后,会返回当前线程是否被中断,并清除中断标记位
    • 4.9 cancelAcquire 当前节点设为CANCELLED,并移除同步等待队列
    • 4.10 selfInterrupt 当前线程设置中断标记
    • 4.11 unparkSuccessor 唤醒后继者
    • 4.12 tryAcquireNanos 尝试指定时间内能否获取锁。可以被外部类调用的public方法
    • 4.13 doAcquireNanos 获取锁,等待指定时间,超时被唤醒再次尝试获取资源失败后返回false
    • 4.1.4 acquireInterruptibly 可中断地获取资源。可以被外部类调用的public方法
    • 4.15 doAcquireInterruptibly 可中断地获取资源。
    • 4.16 release 释放资源/释放锁。可以被外部类调用的public方法
    • 4.17 tryRelease 需要被子类重写。尝试释放资源(这里以ReentrantLock为例)
  • 五、共享模式源码分析

一、前言

  AQS逻辑分为三部分:独占模式(锁标识为EXCLUSIVE)、共享模式(锁标识为SHARED)、条件队列(也就是内部类ConditionObject里的逻辑)。

  其中,条件队列依附于独占模式(只有独占模式情况下才会用到ConditionObject)。本文主要讲独占模式及共享模式下涉及到的源码。

  共享模式和独占模式在state资源数上赋予的意义不同,共享模式的state类似于许可数,值为0时则无法再获取锁。而独占模式的state类似于某线程占用该锁的次数,次数为0时才可以获取锁。

  本文适合在理解Node内部类、CAS后阅读

二、带着问题看源码 2.1 源码里独占模式和共享模式字段对应的值是什么,这样设计带来的影响是什么 2.2 Node.waitStatus在AQS中有哪些状态,代表意义,设置或变更时机是什么 2.3 acquire/acquireInterruptibly/tryAcquireNanos三者的区别是什么 三、public方法 3.1 独占模式

独占模式有四个公共方法:
获取资源:acquire,acquireInterruptibly,tryAcquireNanos
释放资源:release

3.1.1 简略流程图 3.1.1.1 acquire方法

3.1.1.2 acquireInterruptibly方法

3.1.1.3 tryAcquireNanos方法

3.1.1.4 release方法

3.1.2 方法列表
  • acquire
  • tryAcquire
  • addWaiter
  • enq
  • acquireQueued
  • setHead
  • shouldParkAfterFailedAcquire
  • parkAndCheckInterrupt
  • cancelAcquire
  • unparkSuccessor
  • LockSupport.unpark
  • tryAcquireNanos
  • doAcquireNanos
  • acquireInterruptibly
  • doAcquireInterruptibly
  • release
  • tryRelease
3.2 共享模式

共享模式同样有四个公共方法:
获取资源:acquireShared,acquireSharedInterruptibly,tryAcquireSharedNanos
释放资源:releaseShared

共享模式下的四个公共方法基本逻辑与独占模式相同,最主要的不同点在于获取到资源时的处理不同setHeadAndPropagate

3.2.1 简略流程图 3.2.1.1 acquireShared

3.2.1.2 releaseShared

3.2.2 共享模式用到,独占模式没用到的方法列表
  • acquireShared
  • tryAcquireShared
  • doAcquireShared
  • setHeadAndPropagate
  • doReleaseShared
  • doAcquireSharedInterruptibly
  • doAcquireSharedNanos
四、独占模式源码分析 4.1 acquire 可以被外部类调用的public方法
	/**
     * 尝试获取arg个资源数,如果失败,就包装成独占锁的节点进入队列。
     * 如果进入队列后直接能获取到资源,则结束
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
4.2 tryAcquire 需要被子类重写(这里以ReentrantLock为例)

用于被子类重写,判断当前能否直接获取到锁。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

以ReentrantLock重写的非公平方法为例。先获取当前占用资源数,判断如果是0,或者当前线程已经获取过了,则获取成功,否则获取锁失败,返回false。

/**
    * 尝试以非公平方式获取资源
    */
   protected final boolean tryAcquire(int acquires) {
       return nonfairTryAcquire(acquires);
   }
   /**
    * 非公平尝试获取锁
    * 当发现资源未上锁,cas尝试获取
    * 否则判断是否已拥有,重入锁。
    */
   final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }
4.3 addWaiter 将当前线程包装成节点并入队
	/**
     * 1.将当前节点封装成对应模式的节点
     * 2.cas将当前节点设置成尾节点
     * @param mode 节点模式,这里传的独占模式Node.EXCLUSIVE
     * @return 包装后的节点
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 如果队列中尾指针不为空,尝试将当前节点放到尾节点上,并且cas尝试将尾指针指向当前节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 不断尝试入队
        enq(node);
        return node;
    }

可以看到,Node构造函数传了两个参数:当前线程和mode模式。
mode模式有两种:

  • 共享锁标识 static final Node SHARED = new Node();
  • 独占锁标识 static final Node EXCLUSIVE = null;

nextWaiter被赋值为mode,说明如果使用了等待队列,那么当前模式一定是独占模式的(独占模式下nextWaiter才可以被赋值,共享模式不可以被赋值,赋值后无法判断是否为共享模式)

 		/**
         * 构造
         */
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }
4.4 enq 初始化头结点并将当前节点入队
 /**
     * 如果队列为空,就建头结点
     * 尝试将当前节点入队
     * @param node 当前节点
     * @return 原先的尾节点
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 如果尾节点为null,说明队列为空,需要新建头结点,然后下次循环将当前节点加进去
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 有头结点则cas设置尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
4.5 acquireQueued 线程在当前方法循环阻塞、唤醒、尝试获取资源、失败阻塞,成功退出的过程
	/**
     * 将同步队列中的节点阻塞,被唤醒后则再次尝试获取资源,失败再阻塞,处于循环中
     * 直到抛出异常或者线程获取到了资源
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前序节点,如果前序节点是头,并且当前线程能够获取到资源,则将当前节点变为头,并且使之前的头脱离队列
                final Node p = node.predecessor();
                // 如果p是头结点(说明node是队列中除了头的唯一节点)并且尝试获取资源成功
                if (p == head && tryAcquire(arg)) {
                    // 设置当前节点为头结点,并且将原来的头移除队列
                    setHead(node);
                    // 原来头的后继指向null
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果前序节点不是头,或者获取资源失败,
                // 正常来讲,shouldParkAfterFailedAcquire这里node为队尾,p为node的前驱节点。
                // 如果p的waitStatus为0,设为SIGNAL,返回false进入下轮循环。
                // 如果为SIGNAL返回true进入parkAndCheckInterrupt
                if (shouldParkAfterFailedAcquire(p, node) &&
                        // 在这里阻塞,被唤醒后返回当前线程中断状态并清除中断标记。如果被中断过返回true,进入if方法内部
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // try抛异常则取消获取资源
            if (failed)
                cancelAcquire(node);
        }
    }
4.6 setHead 设置头结点,头结点占有线程
 /**
     * 设置某节点为头结点
     * @param node 一般是队首节点(默认头结点除外)
     */
    private void setHead(Node node) {
        head = node;
        // 头结点不占有线程
        node.thread = null;
        node.prev = null;
    }
4.7 shouldParkAfterFailedAcquire 判断获取资源失败后是否应该获取资源。主要工作:修改前驱节点的状态为SIGNAL,清除队列中被取消的节点(waitStatus为CANCELLED)
 /**
     * 判断获取资源失败后是否应该获取资源。主要工作:修改前驱节点的状态为SIGNAL,清除队列中被取消的节点(waitStatus为CANCELLED)
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前序节点的waitStatus状态为SIGNAL,说明当前节点等待被唤醒,可以park阻塞
        if (ws == Node.SIGNAL)
            return true;
        // 如果是CANCELLED状态,从前序节点往前找,将所有CANCELLED状态的节点移除队列
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果是0、-3、-2,比较并设置前驱节点的状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 不进行park阻塞 *** 作
        return false;
    }
4.8 parkAndCheckInterrupt 阻塞当前线程,当其唤醒后,会返回当前线程是否被中断,并清除中断标记位

关于LockSupport可以看这篇文章 LockSupport源码理解

	/**
     * 阻塞当前线程,当其唤醒后,会返回当前线程是否被中断,并清除中断标记位
     */
    private final boolean parkAndCheckInterrupt() {
        // 线程中断或者被unpark都可以被唤醒
        LockSupport.park(this);
        return Thread.interrupted();
    }

Thread.interrupted() 方法会返回中断标记和清除中断标记。true为已中断

4.9 cancelAcquire 当前节点设为CANCELLED,并移除同步等待队列

一个比较令人费解的方法。unparkSuccessor的时机我不太理解。

	/**
     * 将节点移除同步队列,设置当前节点状态为CANCELLED
     * 从当前位置向前找,把遇到的CANCELLED节点都移除,直到不是CANCELLED的节点
     * 如果一直找到头结点,有机会唤醒node的后继节点
     */
    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;
        // 取到此节点之前最近的状态小于0的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 循环执行结束后,node.prev的waitStatus<=0
        // 记录pred的下一个节点,用于cas设置pred的next
        Node predNext = pred.next;
        // 设置当前节点状态为CANCELLED
        node.waitStatus = Node.CANCELLED;
        // 如果这个节点已经是尾节点,就把尾节点设置为pred(非CANCELLED的节点)
        if (node == tail && compareAndSetTail(node, pred)) {
            // cas设置pred的next为null
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            // 如果node已经不是尾节点了
            int ws;
            // 防止pred为头结点,并且pred的等待状态为SIGNAL时才能往下走
            // 如果pred不是头结点 并且 (pred.waitStatus为SIGNAL 或 pred.waitStatus<=0且cas设置为SIGNAL成功) 并且 pred.thread不为null
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                // 进到这里说明node后面被加了节点。所以要将pred的后继指向node的后继
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // node != tail && ( pred == head || pred.thread == null || pred.waitStatus!=Node.SIGNAL && (pred.waitStatus>0|| casFailed))
                // pred为头结点,或者 pred被取消了 或者 cas设置pred的状态为SIGNAL失败了?才去唤醒node的后继节点
                unparkSuccessor(node);
            }
            // 为什么这样会help GC呢
            node.next = node; // help GC
        }
    }
4.10 selfInterrupt 当前线程设置中断标记
	/**
     * 当前线程设置中断标记
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
4.11 unparkSuccessor 唤醒后继者

我一直理解唤醒的都是头结点的后继者。但是该方法是唤醒了入参的后继,我还没有找到传参为头之外的情况。

/**
     * 唤醒该节点的后继节点
     * @param node 一般是head节点
     */
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果状态小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3,将其设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾结点开始从后往前开始遍历
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点, 保存结点
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 该结点不为为空,释放许可,在这里阻塞的线程被唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }
4.12 tryAcquireNanos 尝试指定时间内能否获取锁。可以被外部类调用的public方法
 /**
     * 尝试指定时间内能否获取锁。
     * @param arg 资源数
     * @param nanosTimeout 相对等待时间
     * @return 是否获取成功
     * @throws InterruptedException 中断异常
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 先尝试获取资源是否成功,失败的话尝试指定时间内是否能获取锁成功
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }
4.13 doAcquireNanos 获取锁,等待指定时间,超时被唤醒再次尝试获取资源失败后返回false

和requireQueued的区别:

  • 唤醒方式不同:
    • requireQueued: 被其它线程LockSupport.unpark、被中断才会被唤醒
    • doAcquireNanos: 被其它线程LockSupport.unpark、被中断、超时会被唤醒
  • 退出循环方式不同:
    • requireQueued: 被唤醒后,在循环中通过判断当前节点是否为头的后继节点和是否能获取资源来退出循环
    • doAcquireNanos: 被唤醒后,在循环中判断完是否获取资源后,还会判断是否超时来退出循环
  • 中断处理不同:
    • requireQueued: 不处理中断,只返回中断信息
    • doAcquireNanos: 中断后抛出异常
	/***
     * 尝试指定时间内能否获取锁。
     * 实际退出方法时间一定比nanosTimeout时间长
     * @param arg 请求资源数量
     * @param nanosTimeout 相对等待时间
     * @return 是否获取成功
     * @throws InterruptedException 异常
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 绝对结束时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                // 如果超时了,直接返回false
                if (nanosTimeout <= 0L)
                    return false;
                // 判断是否需要阻塞,
                // 如果等待时间比自旋时间小,则用for循环等待获取锁。
                // 否则用LockSupport阻塞对应时间,被唤醒后在下一轮循环判断能否获取锁并退出
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 可以响应中断,抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
4.1.4 acquireInterruptibly 可中断地获取资源。可以被外部类调用的public方法
	/**
     * 可中断地获取资源
     * acquire和acquireInterruptibly两者区别是:
     * acquire不响应中断,只标记中断位
     * doAcquireInterruptibly遇到中断则抛出异常
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
4.15 doAcquireInterruptibly 可中断地获取资源。

和acquireQueued基本完全相同,只是对中断的处理不同。

/**
     * 想想这个方法和acquireQueued有什么区别?
     * 这个方法无返回值。中断直接抛异常
     * acquireQueued返回是否中断,并且交给selfInterrupt标记中断
     * 所以两者区别是:
     * acquire不响应中断,只标记中断位
     * doAcquireInterruptibly遇到中断则抛出异常
     */
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // 遇到中断则抛出异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
4.16 release 释放资源/释放锁。可以被外部类调用的public方法
/**
     * 释放资源,如果全部释放,唤醒同步队列头结点之后的节点(不再阻塞)
     */
    public final boolean release(int arg) {
        // 如果尝试释放资源成功(释放资源,如果全部释放,资源不再被锁住)
        if (tryRelease(arg)) {
            Node h = head;
            // 如果头节点不为空且头的waitStatus不为0,释放后继者
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
4.17 tryRelease 需要被子类重写。尝试释放资源(这里以ReentrantLock为例)
 		/**
         * 完全释放锁后,state为0,设置独占线程为空
         * @param releases 释放数量
         * @return 是否成功
         */
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
五、共享模式源码分析

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/800043.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存