- Lock 和 AQS
- Lock接口
- AQS
- AQS同步器原理
Lock
实现提供比使用synchronized方法和语句可以获得的更广泛的锁定 *** 作。它们允许更灵活的结构化,可能具有完全不同的属性,并且可以支持多个相关联的对象Condition。
Lock接口提供的方法 *** 作:
void lock()
获取锁,如果锁被使用会一直阻塞直至获取到锁。
void lockInterruptibly() throws InterruptedException
如果当前线程未被中断,则获取锁;如果锁可用,则获取锁,并立即返回,如果在加锁过程中发生了Interrupt中断 *** 作,会抛出InterruptedException异常,并中断掉当前线程的加锁状态。
boolean tryLock()
尝试获取锁,仅在调用是锁为空闲状态才获取锁,如果锁是可用的返回true,如果锁不可用,该方法会立即返回false。
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
在指定时间内尝试性获取锁
void unlock()
释放锁。加锁和释放锁是成对出现的,对应于lock()、tryLock()、tryLock(xx)、lockInterruptibly()等 *** 作,如果成功的话应该对应一个unlock *** 作,这样可以避免死锁或者资源的浪费。
Condition newCondition()
AQS返回绑定到Lock实例的新的Condition实例,可以进行线程间通信。
AQS
(AbstractQueuedSynchronizer)是J.U.C下较复杂的一个类,提供了一个为实现依赖于先进先出 (FIFO)等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。
通过上面查看类的子类的层级关系可知:AQS
是CountdownLatch、ReentrankLock、ThreadPoolExecutor、ReentrankReadWriterLock和Semaphore实现的基础。
🐣AQS存在的必要性
上图中体现AQS
很重要的一个概念:“阻塞队列”,当线程无法获取资源时,提供一个FIFO类型的有序队列,用于维护所有处于“等待中”的线程。
🐥AQS核心字段:
主要有三个核心字段:
① private transient volatile Node head;
② private transient volatile Node tail;
③ private volatile int state;
其中state是描述有多少个线程获取锁
state=0:表示锁第空闲状态
state>0:表示锁被占用
state<0: 表示溢出
state在AQS中是一个很重要的属性,不管是独占锁还是共享锁,通过CAS *** 作该字段,如果是独占锁,需要通过CAS将其从0变为1,标记加锁成功;如果是共享锁,表示的是并发的线程数。
head和tail加上CAS就构成了一个FIFO队列,是一个双向链表,每个节点是Node类型,通过双向队列来完成同步状态的管理,如果当前线程获取同步状态失败,AQS将会把当前线程以及等待的状态信息构建成一个节点Node并加入到同步队列中,同时会阻塞当前线程,当同步状态释放掉,会将处于队列首节点的线程唤醒,让该线程获取同步状态。
在队列中节点用来保存同步状态的线程(thread),等待状态(waitStatus)和前驱节点(prev)和后继节点(next)。
static final class Node {
/** 标记表示节点正在共享模式中等待 */
static final Node SHARED = new Node();
/** 标记表示节点正在独占模式下等待 */
static final Node EXCLUSIVE = null;
/**
* 表示线程已经被取消
* 同步队列中的线程因为超时或中断,需要从同步队列中取消。被取消的节点将不会有任何改变
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法
* 后,该节点将会中等待队列中转移到同步队列中,加入到对同步状态的获取
*/
static final int CONDITION = -2;
/**
* 下一次共享模式同步状态获取将会无条件的被传播下去
*/
static final int PROPAGATE = -3;
/**
* 等待状态,仅接受如下状态中的一个值:
* SIGNAL: -1
* CANCELLED: 1
* CONDITION: -2
* PROPAGATE: -3
* 0: 初始化的值
*
* 对于正常的同步节点,它的初始化值为0,对于条件节点它的初始化的值是CONDITION。它使用
* CAS进行修改。
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 获取同步状态的线程
*/
volatile Thread thread;
/**
* 等待队列中的后继节点。如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说
* 节点类型(独占和共享)和等待队列中的后继节点公用同一个字段
*/
Node nextWaiter;
/**
* 如果节点在共享模式下等待则返回true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
volatile int waitStatus:节点的等待状态,一个节点可能位于以下几种状态:
-
CANCELLED = 1
当前的线程被取消,节点 *** 作因为超时或者对应的线程被interrupt。节点不应该留在此状态,一旦达到此状态将从CHL队列中踢出。 -
SIGNAL = -1
表示当前节点的后继节点包含的线程需要运行,也就是unpark.节点的继任节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park() *** 作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。 -
CONDITION = -2
当前节点在等待condition,也就是在condition队列中。表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。 -
PROPAGATE=-3
当前场景下后续的acquireShared能够得以执行。 -
0
当前节点在sync队列中,等待着获取锁
正常状态,新生的非CONDITION节点都是此状态。
非负值标识节点不需要被通知(唤醒)。
volatile Node prev;此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。
volatile Node next;此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。
volatile Thread thread;节点绑定的线程。
Node nextWaiter;下一个等待条件(Condition)的节点,由于Condition是独占模式,因此这里有一个简单的队列来描述Condition上的线程节点。
AQS同步器原理节点(Node)是构成CHL的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程会构建成一个节点并加入到同步器的尾部。CHL的基本结构如下:
基本的思想是表现为一个同步器,支持下面两个 *** 作:
获取锁
:首先判断当前状态是否允许获取锁,如果是就获取锁, 阻塞 *** 作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。
释放锁
:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。
要支持上面两个 *** 作就必须有下面的条件:
● 原子性 *** 作同步器的状态位
● 阻塞和唤醒线程
● 一个有序的队列
目标明确,要解决的问题也清晰了,那么剩下的就是解决上面三个问题。
-
状态位的原子 *** 作
这里使用一个32位的整数来描述状态位,使用CAS *** 作来修改状态。事实上这里还有一个64位版本的同步器(AbstractQueuedLongSynchronizer),这里暂且不谈。 -
阻塞和唤醒线程
标准的JAVA API里面是无法挂起(阻塞)一个线程,然后在将来某个时刻再唤醒它的。JDK 1.0的API里面有Thread.suspend和Thread.resume,并且一直延续了下来。但是这些都是过时的API,而且也是不推荐的做法。
在JDK 1.5以后利用JNI在LockSupport类中实现了此特性。
LockSupport.park()
LockSupport.park(Object)
LockSupport.parkNanos(Object, long)
LockSupport.parkNanos(long)
LockSupport.parkUntil(Object, long)
LockSupport.parkUntil(long)
LockSupport.unpark(Thread)
上面的API中park()是在当前线程中调用,导致线程阻塞,带参数的Object是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。由于park()立即返回,所以通常情况下需要在循环中去检测竞争资源来决定是否进行下一次阻塞。park()返回的原因有三:
①其他某个线程调用将当前线程作为目标调用 unpark;
② 其他某个线程中断当前线程;
③ 该调用不合逻辑地(即毫无理由地)返回。
其实第三条就决定了需要循环检测了,类似于通常写的while(checkCondition()){Thread.sleep(time);}类似的功能。 -
有序队列
在AQS中采用CHL列表来解决有序的队列的问题。
AQS
采用的CHL模型采用下面的算法完成FIFO的入队列和出队列过程。
查看类的方法和属性
对于入队列(enqueue):从数据结构上出发,入列是比较简单的,无非就是当前队列中的尾节点指向新节点,新节点的prev指向队列中的尾节点,然后将同步器的tail节点指向新节点。在AQS中入列的源码如下:
/**
* 为当前线程和给定的模式创建节点并计入到同步队列中
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 创建一个节点
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试添加尾节点,如果失败则调用enq(Node node)方法设置尾节点
Node pred = tail;
// 判断tail节点是否为空,不为空则添加节点到队列中
if (pred != null) {
node.prev = pred;
// CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 插入节点到队列中
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 死循环 知道将节点插入到队列中为止
for (;;) {
Node t = tail;
// 如果队列为空,则首先添加一个空节点到队列中
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// tail 不为空,则CAS设置尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
从上面源码中我们可以看到,在将节点添加到CHL尾部的时候,使用了一个CAS方法(compareAndSetTail(pred, node)),这里使用CAS的原因是防止在并发添加尾节点的时候出现线程不安全的问题(即有可能出现遗漏节点的情况)
入队列的过程:
同步队列遵循FIFO规范,首节点的线程在释放同步状态后,将会唤醒后继节点的线程,并且后继节点的线程在获取到同步状态后将会将自己设置为首节点。因为设置首节点是通过获取同步状态成功的线程来完成的,因此设置头结点的方法并不需要使用CAS来保证,因为只有一个线程能获取到同步状态。CHL出列的过程如下:
AQS 在J.U.C里面是一个非常核心的工具,而且也非常复杂,里面考虑到了非常多的逻辑实现。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)