ReentrantLock是一个互斥锁,它具有synchronized相同的能力;但相比之下,ReentrantLock扩展性更强,比如实现了公平锁。
下面详细拆解下ReentrantLock的公平锁和非公平锁的实现。
JDK版本:1.8.0_40
公平锁先看ReentrantLock两个构造方法:
// 默认非公平
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据传参来实现公平或非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
因此,要想实现公平锁,只需要传参为true即可。
先来看一个测试类,后面都将围绕该测试类来分享:
public class TestReentrantLock {
private static ReentrantLock lock = new ReentrantLock(true);
public static void main(String[] args) {
Runnable runnable = () -> {
process();
};
Thread t0 = new Thread(runnable);
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t0.start();
t1.start();
t2.start();
}
public static void process() {
String name = Thread.currentThread().getName();
System.out.println(name +" 尝试获取锁");
lock.lock();
try {
System.out.println(name +" 已获取锁");
sleep(1000L);
} finally {
System.out.println(name +" 释放锁");
lock.unlock();
}
}
private static void sleep(Long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
// ignore
}
}
}
输出:
Thread-0 尝试获取锁
Thread-2 尝试获取锁
Thread-1 尝试获取锁
Thread-0 已获取锁
Thread-0 释放锁
Thread-2 已获取锁
Thread-2 释放锁
Thread-1 已获取锁
Thread-1 释放锁
根据这个测试类,通过画图的方式,一起来看看每一步都执行了什么。
lock()过程先来看看lock()过程
1)首先创建一个公平锁,ReentrantLock lock = new ReentrantLock(true);
2)t0尝试获取锁,lock.lock(),该方法会调用父类的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg)方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 1.判断state是否为0,因为t0是第一个进来的,此时state为0
if (c == 0) {
// 2.hasQueuedPredecessors() 队列是否有内容,此时没有内容
// 3.compareAndSetState(0, acquires) 尝试将state改成1,我们假设t0线程修改成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 4.将成员变量exclusiveOwnerThread修改为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
由于该方法返回了true,!tryAcquire(arg)为false,acquire(int arg)方法直接返回了。
t0拿到了锁,开始处理业务。此时该对象锁如图:(绿色表明与上一个图的变化,下面也是一样)
3)我们假设t0还未释放锁,接着由t1尝试获取锁,lock.lock(),再来看acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
还是首先看tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 1.判断state是否为0,因为t0已经将该值改成了1
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// exclusiveOwnerThread 也被改成了t0
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
因未释放锁,因此该方法会返回false。!tryAcquire(arg)为true,接着会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
先看看addWaiter(Node.EXCLUSIVE)做了什么:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 此时tail为null,不会执行这个方法
if (pred != null) {
// 如果t1已经入队,t2则直接加入到队尾,tail指向t2
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 入队
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
// 记录尾部node
Node t = tail;
if (t == null) { // Must initialize
// 第1次循环,tail为null,给head和tail赋值一个空的Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 非第1次循环,将传入的node的prev指向刚新建的node
node.prev = t;
// 将传入的node赋值给tail,这里会存在竞争,如果有多个同时修改,那么未修改成功的将进入下一次循环
if (compareAndSetTail(t, node)) {
// 将之前记录的尾部的next指向传入的node,也就是指向尾部
t.next = node;
return t;
}
}
}
}
入队,第1次循环:
第2次循环,将t1入队:
如果在第2次循环的时候,存在竞争,则未竞争成功的会继续第3次循环,最终都将入队:
入队成功之后,我们看下acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前node的prev节点
final Node p = node.predecessor();
// 如上图所示,如果当前节点是t1的话,会再次尝试获取锁。
if (p == head && tryAcquire(arg)) {
// 能获取到锁,表明t0已经释放了锁,此时将head指向当前node
setHead(node);
// prev节点next指向置为空,方便回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果t0未释放,或prev不是head
// shouldParkAfterFailedAcquire(p, node) 该方法是给每个节点的prev节点的waitStatus设置为Node.SIGNAL,即-1
// parkAndCheckInterrupt() 暂停线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法每个节点的prev节点的waitStatus设置为Node.SIGNAL,即-1,并且使线程暂停,如图:
到此,lock()到此结束,除t0正在执行外,其他都在等待。
unlock()过程再来看看unlock过程
1)t0业务逻辑执行完毕,开始释放锁
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 判断head是否不为0,在lock的时候,waitStatus都已改成-1
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 直接把state减1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 不为0,则该线程之前存在重入锁情况
if (c == 0) {
// 为0,则当前线程占用的锁全部释放
free = true;
// 将锁的占有线程置为null
setExclusiveOwnerThread(null);
}
// 设置新的state值,因为此时不会有竞争,可直接设置
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 将head waitStatus改成0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// head的next的node节点waitStatus大于0,即已取消,
// 则从tail开始循环获取到队列头部第一个waitStatus<=0的值
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程,如上图,唤醒的是t1线程
LockSupport.unpark(s.thread);
}
2)t1被唤醒之后,我们再来看acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// t1的prev是head
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 因t0已释放锁,t1获取到锁,exclusiveOwnerThread=t1, state=1
// 将head指向为当前node,即t1所在node,并把该node的prev、thread都置为null
setHead(node);
// prev节点next指向置为空,方便回收
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果是中断唤醒的,interrupted设置为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
t1拿到了锁,exclusiveOwnerThread值为t1, 之前创建的空Node对象没有任何指向,将会被回收掉。
如图:
3)如果t1是被其他线程中断的,parkAndCheckInterrupt()会返回true,interrupted则会设置为true。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 该方法会返回是否被中断,以及清理中断状态
return Thread.interrupted();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 被中断后,会执行该方法;
// 因为线程的中断状态已被清理,此处会再次执行中断
selfInterrupt();
}
因Thread.interrupted()方法会清理中断状态,执行selfInterrupt()方法再次中断下,这样业务逻辑执行时拿到的线程状态就是中断的。
总结下加锁和解锁过程:
1.新加入的线程会尝试获取锁(tryAcquire);
2.获取不到,则入队(addWaiter);
3.入队之后,队头会再次尝试获取锁,获取不到,则和其他线程一样暂停(parkAndCheckInterrupt);
4.有线程释放锁,唤醒队头线程(unparkSuccessor);
5.队头线程尝试获取锁,退出自旋(acquireQueued),开始执行业务逻辑。
非公平锁非公平锁和公平锁的区别:主要在于获取锁的时候要不要先判断是否已有线程在等待。非公平锁,有线程进来,直接尝试获取一次锁。
我们看下非公平锁获取锁的过程:
final void lock() {
// 先尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法会调nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 同样也是先尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
每次都是先尝试compareAndSetState(0, acquires)获取锁。
再来看下公平锁获取过程:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先判断队列是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁,会判断队列是否有线程在等待。
以上是ReentrantLock详解,有问题欢迎沟通。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)