抽象队列同步器AQS

抽象队列同步器AQS,第1张

上一篇:并发编程之synchronized详解

抽象队列同步器AQS
  • AQS应用---Lock
    • 并发之父
    • ReentrantLock
    • AQS具备特性
    • 同步等待队列
    • 条件等待队列
    • AQS源码分析

AQS应用—Lock 并发之父


生平不识Doug Lea,学懂并发也枉然
Java并发编程核心在于java.concurrent.util包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源 的同步器框架,是一个依赖状态(state)的同步器。

ReentrantLock

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

//使用ReentrantLock进行同步
//false为非公平锁, true为公平锁
ReentrantLock lock = new ReentrantLock(false);
lock.lock() //加锁
lock.unlock() //解锁

在ReentrantLock内部定义了一个Sync的内部类 ,改类继承了AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;
并且还定义了两个子类:

  • FairSync 公平锁的实现

  • NonfairSync 非公平锁的实现

  • List item

这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。

AQS具备特性
  • 组上我等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

AQS框架实现

  • 一般通过定义内部类Sync继承AQS
  • 将同步器所有调用都映射到Sync对应的方法

AQS内部维护属性volatile int state (32位)

  • state表示资源的可用状态

State三种访问方式

  • getState()、setState()、compareAndSetState()

AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行
  • Share-共享,多个线程可以同时执行

AQS定义两种队列

  • 同步等待队列
  • 条件等待队列
同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH 队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

条件等待队列

ondition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁。

AQS源码分析
public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() { }

    /**
     * Wait queue node class.
     * 不管是条件队列还是CLH队列都是基于Node类
     * AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagers ten三人
     * 发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,
     * Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
     * 
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * 
*/
static final class Node { /** 共享模式 */ static final Node SHARED = new Node(); /** 独占模式 */ static final Node EXCLUSIVE = null; /** 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待 */ static final int CANCELLED = 1; /** * 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。 */ static final int SIGNAL = -1; /** * 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condit ion调用了signal()方法后,节点会从等待队列中转移到同步队列中,加入到同步状态的获取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步状态获取将会被无条件地传播下去 */ static final int PROPAGATE = -3; /** * 标记当前节点的信号量状态 (1,0,‐1,‐2,‐3)5种状态 * 使用CAS更改状态,volatile保证线程可见性,高并发场景下, * 即被一个线程修改后,状态会立马让其他线程可见。 */ volatile int waitStatus; /** * 前驱节点,当前节点加入到同步队列中被设置 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 节点同步状态的线程 */ volatile Thread thread; /** * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量, * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。 */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前驱节点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } //空节点,用于标记共享模式 Node() { // Used to establish initial head or SHARED marker } //用于同步队列CLH Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //用于条件队列 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * 队列头节点 */ private transient volatile Node head; /** * 队列尾节点 */ private transient volatile Node tail; /** * 同步器状态 */ private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // Queuing utilities static final long spinForTimeoutThreshold = 1000L; /** * 节点加入CLH同步队列 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 队列为空需要初始化,创建空的头节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 把当前节点置为尾部 if (compareAndSetTail(t, node)) { // 前驱节点的next指针指向当前节点 t.next = node; return t; } } } } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 1. 将当前线程构建成Node类型 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 2.1当前尾部节点是否为null if (pred != null) { // 2.2把当前节点以尾部的方式插入 node.prev = pred; // 2.3 CAS把当前节点插入队列尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //获取wait状态 int ws = node.waitStatus; if (ws < 0) // 将等待状态waitStatus设置为初始值0 compareAndSetWaitStatus(node, ws, 0); /* * 若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点进行唤醒 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * 把当前结点设置为SIGNAL或者PROPAGATE唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head‐>B,然后又会唤醒B.next,一直重复直到共享节点都唤醒 * head节点状态为SIGNAL,重置head.waitStatus‐>0,唤醒head节点线程,唤醒后线程去竞争共享锁 * head节点状态为0,将head.waitStatus‐>Node.PROPAGATE传播状态,表示需要将状态向后继节点传播 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { /* head状态是SIGNAL,重置head节点waitStatus为0,E这里不直接设为N de.PROPAGAT, * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个, 避免两次unpark */ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程 * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后h ead会指向获取锁的节点, */ unparkSuccessor(h); } /* * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。 * 意味着需要将状态向后一个节点传播 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } /** * 把node节点设置成head节点,且Node.waitStatus‐>Node.PROPAGATE */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //h用来保存旧的head节点 setHead(node); //head引用指向node节点 /* 这里意思有两种情况是需要执行唤醒 *** 作 * 1.propagate > 0 表示调用方指明了后继节点需要被唤醒 * 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //node是最后一个节点或者 node的后继节点是共享节点 if (s == null || s.isShared()) /* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStat s‐>0 * head节点状态为0(第一次添加时是0),设置head.waitStatus‐>Node.PR OPAGATE表示状态需要向后继节点传播 */ doReleaseShared(); } } // Utilities for various versions of acquire /** * 终结掉正在尝试去获取锁的节点 */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park */ return true; if (ws > 0) { /* * 前驱节点状态如果被取消状态,将被移除出队列 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 当前驱节点waitStatus为 0 or PROPAGATE状态时将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 中断当前线程 */ static void selfInterrupt() { Thread.currentThread().interrupt(); } /** * 阻塞当前节点,返回当前Thread的中断状态 * LockSupport.park 底层实现逻辑调用系统内核功能 * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } /** * 已经在队列当中的Thread节点,准备阻塞等待获取锁 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 找到当前节点的前驱节点 final Node p = node.predecessor(); //如果前驱结点是头结点,才tr Acquire,其他结点是没有机会tryAcquire。 if (p == head && tryAcquire(arg)) { //获取同步状态成功,将当前结点设置为头结点。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* * 如果前驱节点不是Head,通过shouldParkAfterFailedAcquire判断是否应该阻塞 * 前驱节点信号量为‐1,当前线程可以安全被parkAndCheckInterrupt用来阻塞线程 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 与acquireQueued逻辑相似,唯一区别节点还不在队列当中需要先进行入队 *** 作. */ private void doAcquireInterruptibly(int arg) throws InterruptedException { // 以独占模式放入队列尾部 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 独占模式定时获取 */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; // 加入队列 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; //超时直接返回获取失败 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //阻塞指定时长,超时则线程自动被唤醒 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) //当前线程中断状态 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 尝试获取共享锁 */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // Main exported methods /** * 尝试获取独占锁,可指定锁的获取数量 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 尝试释放独占锁,在子类当中实现 */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态, * 其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcqui e返回值为boolean,这很容易理解; * 对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态, 这也是“共享”的意义所在。 * 本方法待被之类覆盖实现具体逻辑 * 1.当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取; * 2.当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了; * 3.当返回值小于0时,表示获取同步状态失败。 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** * 释放共享锁,具体实现在子类当中实现 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 当前线程是否持有独占锁 */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * A获取独占锁 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //独占模式 selfInterrupt(); } /** * 获取独占锁,设置最大等待时间 */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } /** * 释放独占模式持有的锁 */ public final boolean release(int arg) { if (tryRelease(arg)) { //释放一次锁 Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继结点 unparkSuccessor(h); return true; } return false; } /** * 请求获取共享锁 */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 判断当前线程是否在队列当中 */ public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) if (p.thread == thread) return true; return false; } /** * 判断当前节点是否有前驱节点 */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } /** * 判断当前节点是否有前驱节点 */ public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } // Instrumentation and monitoring methods /** * 同步队列长度 */ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; } /** * 获取队列等待thread集合 */ public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; } /** * 获取独占模式等待thread线程集合 */ public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } /** * 获取共享模式等待thread集合 */ public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } // Internal support methods for Conditions /** * 判断节点是否在同步队列中 */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); } /** * 将节点从条件队列当中移动到同步队列当中,等待获取锁 */ final boolean transferForSignal(Node node) { /* * 修改节点信号量状态为0,失败直接返回false */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 加入同步队列尾部当中,返回前驱节点 Node p = enq(node); int ws = p.waitStatus; //前驱节点不可用 或者 修改信号量状态失败 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); //唤醒当前节点 return true; } }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/870870.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存