并发编程系列之AQS实现原理
1、什么是AQS?AQS(AbstractQueuedSynchronizer),抽象队列同步器,是juc中很多Lock锁和同步组件的基础,比如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore等等,提供了对资源的占用、释放,线程的等待、唤醒等等接口或具体实现,可以用在各种需要控制资源竞争的场景中。
2、理清AQS整体架构看一下juc包里的AbstractQueuedSynchronizer的源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //头结点 private transient volatile Node head; //尾节点 private transient volatile Node tail; //volatile修饰的信号量 private volatile int state; // 链表的Node节点 static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; } // ... } // AbstractQueuedSynchronizer父类 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private transient Thread exclusiveOwnerThread; // ... }
AQS主要由如下组件构成,最主要的还是信号量stata和CHL队列
源码里,state是由volatile关键字修饰的,在前面的学习中,可以知道并发编程的三大特性是:原子性、有序性、可见性。而volatile修饰的变量,可以保证有序性和可见性。
volatile保证可见性
写state值,volatile特性会将数据同步到主内存读state值,volatile特性会重新从主内存加载数据到工作内存
这个涉及到JMM,不熟悉的读者可以翻一下我之前博客
cpu、jvm、内存都会在代码执行过程进行指令重排序,重排序过程,可能出现一些程序异常,所以,要保证有序性,可以加上volatile关键字,volatile避免重排序依赖于 *** 作系统的内存屏障。
前面说明了加volatile的原因,然后还有一个特性,原子性,volatile只能保证单个变量的原子性,并不能保证一系列 *** 作的原子性,所以不是线程安全的,然后AQS里是怎么保证线程安全的?然后AQS源码里怎么实现的?翻下源码,找到两个地方:
这里就是很常见的CAS锁,利用了unsafe的api,利用cpu *** 作的原子性保证这一 *** 作的原子性
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
这个方法,直接将一个newStata设置,因为修饰的是一个变量,对基本类型的变量进行直接赋值时,是可以保证原子性的
protected final void setState(int newState) { state = newState; }2.2、FIFO队列
AQS 作者 Doug Lea 给出的一份AQS文档:http://gee.cs.oswego.edu/dl/papers/aqs.pdf,里面对AQS做了比较详细的描述,英文水平比较好的读者可以看看
CHL队列是AQS的一个核心,FIFO 队列,即先进先出队列,这个队列的主要作用是存储等待的线程,假设很多线程去抢锁,大部分线程是抢不到锁的,所以这些线程需要有个队列来存储,这个队列就是CHL队列,同样遵循FIFO规则。
CHL队列内部结构是一个双向链表的数据结构,基本组件是Node节点,队列中,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点,head节点的线程就可以持有锁,然后后面的线程排队等待,如图所示
AbstractQueuedSynchronizer类extends AbstractOwnableSynchronizer类,AbstractOwnableSynchronizer有exclusiveOwnerThread这个定义的线程对象,表示独占模式下同步器的持有者。
3、AQS实现原理前面已经对AQS的整体设计有了一个比较清晰的了解,主要关注点是信号量state和CHL队列,然后可以跟下源码实现,@see java.util.concurrent.locks.AbstractOwnableSynchronizer,看源码不需要一点点看,看下主要的方法和类即可,理清框架的实现原理就可以,AQS主要关注*acquire* 和 *release*这些方法就可以,分为两种类型方法,一种是独占式,另外一种是共享式,共享式方法都有加上一个shared后缀
acquire、acquireShared:定义了资源争用的逻辑,如果没拿到,就等待tryAcquire、tryAcquireShared:实际执行占用资源的 *** 作,由具体的AQS使用者实现release、releasedShared:定义了资源释放的逻辑,资源释放之后,调整队列后续节点继续争抢tryRelease、tryReleaseShared:实际执行资源释放的 *** 作,具体的由AQS使用者实现 3.1、独占式
下面跟下AQS源码,注意对比独占式和共享式的实现方式有什么区别
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire方法总体思路:
tryAcquire()方法尝试去获取资源,如果成功直接返回如果获取失败,通过addWaiter方法将当前线程包装成Node插入到CLH队列尾部,独占的标记为Node.EXCLUSIVEacquireQueued方法将线程阻塞在等待队列,直到获取到资源。整个等待过程,线程被中断过,返回true,否则返回false。获取到资源后而且被中断过,就会调用selfInterrupt将中断补上
tryAcquire方法由具体使用者类实现,不实现是会抛出异常的,所以必须实现
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
addWaiter方法作用,将当前线程包装成Node插入到CLH队列尾部
private Node addWaiter(Node mode) { // 将当前线程封装成Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 尾节点不能空,队列已经初始化过 if (pred != null) { // Node节点的前prev指针指向尾节点 node.prev = pred; // cas锁 *** 作,保证线程安全 if (compareAndSetTail(pred, node)) { // 尾节点指针指向Node节点 pred.next = node; return node; } } // 队列没初始化过,调用enq方法 enq(node); return node; }
跟一下enq方法
private Node enq(final Node node) { // 无限循环,类似于while(true),自旋 *** 作 for (;;) { // 尾节点 Node t = tail; if (t == null) { // Must initialize // 创建new一个节点,设置为头节点 if (compareAndSetHead(new Node())) // 头结点赋值给尾结点 tail = head; } else { // 如果尾节点不为空,将Node节点的前驱指向尾节点 node.prev = t; // cas,将node节点设置为尾节点 if (compareAndSetTail(t, node)) { // 尾节点的next指针指向Node节点 t.next = node; return t; } } } }
再往上看一下java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法的acquireQueued方法,前面说到acquireQueued方法作用是,线程排队等待,直到获取到资源
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋,无限循环 for (;;) { // 寻找node节点前驱节点 final Node p = node.predecessor(); // 前驱节点是head节点,tryAcquire if (p == head && tryAcquire(arg)) { // 获取资源成功,将node节点设置为head新的节点 setHead(node); // 原来的head节点设置为null,等待GC垃圾回收 p.next = null; // help GC failed = false; return interrupted; } // p不是头结点,或者tryAcquire()获取资源失败 // shouldParkAfterFailedAcquire判断是否可以被park // parkAndCheckInterrupt判断park和Interrupt是否成功 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }3.2、共享式
共享式即共享资源可以被多个线程同时占有
先看顶层的acquireShared方法:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared方法是由具体使用者类去实现的,所以先看看doAcquireShared源码:
private void doAcquireShared(int arg) { // 封装为Node,标记为Node.SHARED final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 无限循环,自旋 for (;;) { // 查找前驱节点 final Node p = node.predecessor(); // 是否为head节点 if (p == head) { // 尝试抢资源 int r = tryAcquireShared(arg); if (r >= 0) { // 抢资源成功 // 设置为头节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // p不是头结点,或者tryAcquire()获取资源失败 // shouldParkAfterFailedAcquire判断是否可以被park // parkAndCheckInterrupt判断park和Interrupt是否成功 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这个逻辑和独占式基本差不多,不同的地方在于入队的Node是标志为SHARED共享式的,tryAcquireShared是由具体的类去实现
参考资料详细讲解!JUC中重要的AQSAQS原理|慕课网教程JUC AQS ReentrantLock源码分析(一)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)