AQS是用来构建锁和其他同步组件的基础框架,它也是Java三大并发工具类(CountDownLatch、CyclicBarrier、Semaphore)的基础。ReentrantLock,甚至BlockingQueue也是基于它的实现。本文从ReentrantLock的lock()出发,从源码维度分析一个加锁解锁动作都干了什么事。
公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
基础数据结构 我们可以先思考一下ReentrantLock类能够提供的功能以及基础的语义。
多个线程竞争同一把lock锁,这时分为独享锁(排他锁)和共享锁(ReentrantLock语义是独享锁),独享锁竞争成功则只有一个线程可以持有,竞争失败的线程切换状态进入阻塞队列等待持有锁的线程解锁唤醒。且ReentrantLock实现了公平锁与非公平锁,还实现了可重入功能。
综上,AQS应该维护一个阻塞队列,应该维护一个变量表示锁重入的次数。此时我们再来看看数据结构。
// 虽然名称为head,但不属于阻塞队列,可以理解阻塞队列的管理单元。 private transient volatile Node head; // 阻塞队列的尾节点,每个新的节点进来,都插入到最后 private transient volatile Node tail; // 代表锁的状态,0表示锁没有被占用,大于0表示锁被重入的次数 private volatile int state; // 持有独占锁的线程,存储下来方便判断是否是重入 private transient Thread exclusiveOwnerThread;
可以说变量state是所有变量中最重要的了,通过它我们可以判断该锁对象是否可以获取以及确定重入的次数,竞争锁也就是多线程间抢着把state从0变成1,而解锁动作也就是线程把state从n变成0。
AQS把每个线程封装成了node对象,通过node对象串成双向链表维护队列。
class Node { // 表示该节点处于共享模式 static final Node SHARED = new Node(); // 标识该节点处于独占模式 static final Node EXCLUSIVE = null; // 等待的状态,属性值为下面的常量,初始化为0,正常状态 volatile int waitStatus; // 前面的节点 volatile Node prev; // 后面的节点 volatile Node next; // 被封装过来的线程 volatile Thread thread; // 代表此线程取消了争抢这个锁 static final int CANCELLED = 1; // 表示具备唤醒当前node的后继节点的能力,释放资源后会唤醒后继节点 static final int SIGNAL = -1; // 等待condition唤醒 static final int ConDITION = -2; // (共享锁)状态需要向后传播 static final int PROPAGATE = -3; }加锁 1. 获取锁
从基础来讲,获取锁的语义很简单,成功了则获取到,失败了则线程切换状态阻塞在队列中。
对应的源代码如下:
final void lock() { //调用父类AQS的代码 acquire(1); } //父类AQS的代码 public final void acquire(int arg) { if (!tryAcquire(arg) && // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
可以说acquire()方法就是AQS的核心了,接下来将从尝试获取锁、加入阻塞队列、挂起线程三个方面详尽展开。
2. tryAcquire() 抛开代码只看数据结构,让我们自己设计这个函数,你会怎么设计呢?
功能:抢占锁、可重入、公平锁。
首先,检查锁对象的state变量,判断是不是0,如果是0的话,代表当前锁是可占用的,那么此时我们可以立刻去抢占吗?
显然是不行的,因为我们要实现公平锁,state == 0出现的一种情况是持有锁的线程刚释放锁,唤醒等待队列里的线程让他们去占用锁,而等待队列里的线程正在尝试获取锁,但还没成功,此时未入队的线程看到的state就是为0的。而公平锁的语义是保证先让等待队列里的线程获取到锁后再轮到自己,因此我们要加第二个判断条件,看等待队列里是否有线程,如果没有的话,那么我们可以去抢占锁。
多线程环境下,要时刻注意线程安全问题,所以抢占锁的动作一般都是用CAS保证安全的。
不幸的是,多线程环境下,很可能不止你一个线程想要竞争这个锁,也很可能多个线程同时发现了锁可占用而同时去抢占,而ReentrantLock类又是排他锁,所以只能有一个线程竞争成功,其他都失败。
总结一下,我们尝试获取锁有三种可能失败的方式:
-
state > 0
-
state == 0且等待队列里有线程
-
state == 0 && 等待队列为空且没抢过其他线程
由于ReentrantLock是可重入的,竞争失败后我们要对比下持有锁的线程是不是就是自己,如果是的话直接++state即可,相当于重入了一次,不是的话说明我们竞争锁彻底失败,准备进等待队列里阻塞去吧。
整理一下我们的流程:
将思路转化成代码如下(真实的AQS源码):
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state == 0 此时此刻没有线程持有锁 if (c == 0) { // 看看队列中是否有其他线程 if (!hasQueuedPredecessors() && // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了, // 不成功的话,说明就在刚刚几乎同一时刻有个线程抢先了 compareAndSetState(0, acquires)) { // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁 setExclusiveOwnerThread(current); return true; } } // 是否重入? else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 没有重入,接收失败的事实要被阻塞了 return false; }3. addWaiter()
加入阻塞队列,让你来设计,你怎么设计?
其实就是向双向链表尾部插入节点嘛,但是在多线程竞争情况下我们要充分考虑可能会发生的情况:
- 回顾之前的代码,我们发现head和tail没有进行初始化,也就是为null,此时如果直接插入的话会报空指针异常,因此要将其初始化,初始化触发的条件就是队列为空。
- 多线程环境下,可能会有多个线程同时入队,因此要注意线程安全问题。
开始设计:
-
首先考虑head和tail初始化的问题,判断队列是否为空。
-
如果不为空,说明初始化没什么问题,那么要考虑多线程安全问题,所以将节点CAS插入队尾,返回false则更改期望值继续循环插入。
-
如果队列为空,那么初始化head和tail节点。此处引申出一个问题,队列空一定是因为没有初始化head和tail吗?一定是的,既然我们进到这个函数,说明当前线程抢占锁失败了,失败的原因可见上文那三点,通过那三点可以推断出head和tail一定没有初始化。
-
初始化成功后,继续CAS插入队尾。
整理一下流程:
代码实现(AQS源码):
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后 Node pred = tail; // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧) if (pred != null) { // 将当前的队尾节点,设置为自己的前驱 node.prev = pred; // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴 if (compareAndSetTail(pred, node)) { // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连, // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了 pred.next = node; // 线程入队了,可以返回了 return node; } } // 仔细看看上面的代码,如果会到这里, // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队) // 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; // 之前说过,队列为空也会进来这里 if (t == null) { // 初始化head节点和tail节点 if (compareAndSetHead(new Node())) // 初始化的head节点的 waitstaus == 0 // 注意:这里只是设置了tail=head,但是还没将其插到队尾(因为head不属于阻塞队列) tail = head; } else { // 循环CAS用,成功就return node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }4. acquireQueued()
现在我们已经将节点加入进了阻塞队列,下一步是挂起线程,如果让你来设计,你怎么设计?
按理来说,加入到阻塞队列我们直接将其挂起就好了,但是我们考虑一种情况,如果该节点加到了等待队列里的第一个位置,也就是说它的前驱节点就是head,我们是否可以尝试一下抢占锁,因为很有可能在我们入队的过程中锁被释放了。
开始设计:
- 判断前驱节点是否是head?
- 如果是head,尝试下抢占锁
- 如果不是head,挂起线程
流程整理:
代码实现(AQS源码):
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head,可以尝试一下抢占锁 // 所以当前节点可以去试抢一下锁 // 这里我们说一下,为什么可以去试试: // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node, // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程 // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试, // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试 *** 作一下state if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头, // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 什么时候 failed 会为 true??? // tryAcquire() 方法抛异常的情况 if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true if (ws == Node.SIGNAL) return true; // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。 // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的 *** 作是由前驱节点完成的。 // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点, // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队, // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 仔细想想,如果进入到这个分支意味着什么 // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3 // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是初始化后的0 // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 这个方法返回 false,那么会再走一次 for 循序, // 然后再次进来此方法,此时会从第一个分支返回 true return false; } // 挂起线程的代码 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
直到此处,我们终于将一个线程抢占一个lock锁的逻辑与代码分析完毕。
解锁 解锁的 *** 作相对于来说简单很多,就是更新state,唤醒后继节点。
public void unlock() { sync.release(1); } public final boolean release(int arg) { // 和加锁的设计模式一样 --- 模板方法模式 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 回到ReentrantLock看tryRelease方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否完全释放锁 boolean free = false; // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 唤醒后继节点 // 从上面调用处知道,参数node永远都是head头结点 private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 如果head节点当前waitStatus<0, 将其修改为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1) // 从队尾往前找,找到waitStatus <= 0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)