并发编程是Java中非常关键的基本功,在Java中并发编程包括的知识点如下图,非常之多,锁是保证并发的一种手段之一,本文将围绕Aqs,ReentrantLock,ReentrantReadWriteLock的实现来分析锁设计的思想。
Jdk为什么需要设计Lock?
使用了Sychronized之后,发现Sychronized提供的锁功能存在一些短板,比如不能取消,不可以等待,不可以中断,而Lock就弥补了Sychronized的这些缺陷,下面是区别,所以Lock有他存在的必要性。
Lock与Sychronized区别、特性
功能方面
1、Synchronized Jvm关键字实现 互斥锁实现。monitorenter和monitorexit
2、Synchronized 使用范围包括 *** 作方法,对象,同步代码块
3、Synchornized 可重入 不可中断 不可等待取消 非公平,而Lock是可重入,可中断,可等待取消,支持公平/非公平(默认)
4、Synchronized在线程发生异常时会自动释放锁,因此不会发生异常死锁。Lock异常时不会自动释放锁,所以需要在finally中实现释放锁。
5、Lock是可以中断锁,Synchronized是非中断锁,必须等待线程执行完成释放锁。
6、Lock可以使用读锁提高多线程读效率,如ReentrantLock。
7、在Lock中可以使用多组Condition实现多组线程多条件通信,而Sychronized使用wait/notify只能针对一组线程通信。
在实现方面
1、都使用了cas+volatile+自旋。
2、排队抢锁的线程,sychronized使用了两个队列,而reentrantlock只使用了一个队列。
3、sychronized有自适应自旋,而Lock没有,Lock只有自旋和park.
在分析ReentrantLock之前,有必要对aqs进行了解,因为ReentrantLock是基于aqs的框架实现。
aqs(AbstractQueuedSynchronizer)基础组件提供的能力
1、内部维护了Node类型的双向链表,用来存储等待线程的,以及等待线程的状态,提供线程排队基础。
`Node 有 5 中状态,分别是:`
CANCELLED(1): 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点,其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化
SIGNAL(-1): 只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程抢占锁
CONDITION(-2):和 Condition 有关系
PROPAGATE(-3):共享模式下,PROPAGATE状态的线程处于可运行状态
0:初始状态
2、一个volatile int state字段,用来表示锁的状态和锁的重入次数 0无锁,>0加锁,提供了get/set/compareAndSwapInt方法来 *** 作state状态。
3、等待线程入队。
4、唤醒后继节点。
5、加锁算法定义。
6、释放独占锁,共享锁算法定义,提供子类重写方法,实现不同的同步组件
7、提供了一些监控锁的方法,等待线程队列长度,排队中的线程,
8、condition辅助类
aqs是对锁的通用功能进行抽象,子类通过继承aqs实现同步的功能。其中ReentrantLock就是基于aqs实现的可重入独占锁。
ReentrantLock类结构图如下:
支持公平锁和非公平锁。默认是非公平锁
非公平锁设计思想
1、一请求就尝试获取锁
2、获取锁失败,继续尝试一定次数来获取锁
3、获取锁成功,通过cas实现state字段值原子自增,exclusiveOwnerThread设置成当前线程。
4、如果已经获取到了锁,state自增1,实现可重入特性,当前线程对应的节点设置成头节点。
5、如果尝试获取锁失败,将线程加入队列,并阻塞当前线程
如下是加锁成功的代码。
非公平锁获取锁分四个阶段,详细流程如下
第一阶段,快速占用锁阶段
这个阶段就是一调用lock方法,就直接通过cas设置一次state字段加锁,不管有前面没有线程获取锁,这就是体现了`非公平`的地方。
抢锁入口:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { //通过cas快速加锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //进入正常加锁流程 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
第二阶段,正常的加锁阶段
* 这个阶段和第一阶段类似,也是使用cas尝试加一次锁,多了重入锁的判断,如果当前锁已经被当前线程占有了,state自增,体现了`重入锁`特性。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //尝试快速加一次锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //重入锁判断和获取 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
第三阶段,当前线程加入队列,排队
这个阶段,是线程入队列,之前尝试两次都没抢到锁,需要排队抢锁
//线程加入等待队列private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //尝试快速加入队列尾部 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //正常入队列 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize 初始化队头(空节点) if (compareAndSetHead(new Node())) tail = head; } else { //加入队尾 首先新节点 前置节点执行老尾节点 node.prev = t; //新的尾节点 设置成 新节点 if (compareAndSetTail(t, node)) { //老尾节点 指向 新节点 t.next = node; return t; } } } }
第四阶段,自旋抢站锁阶段
自旋,根据上一个节点状态来判断是否可以抢占锁,还是park阻塞
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋 for (;;) { final Node p = node.predecessor(); //如果前驱节点是头节点,则具备抢锁资格 if (p == head && tryAcquire(arg)) { //抢锁成功了,自己作为头节点 setHead(node); //帮助老的头节点回收 p.next = null; // help GC failed = false; return interrupted; } //是否需要park阻塞,前一个节点状态是SIGNAL就要park, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //park interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
如果达到了park条件,则可能进入park,
private final boolean parkAndCheckInterrupt() {//调用unsafe类的park方法 阻塞在这里,等待获取锁的线程释放锁并唤醒 我 然后继续执行 LockSupport.park(this); return Thread.interrupted(); }
公平/非公平锁释放锁流程
非公平锁的释放锁设计思想
unlock方法调用一次将state减少1
如果state变成了0,则exclusiveOwnerThread设置成null
释放锁成功后,需要将等待获取锁的线程唤醒
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
如果头节点状态不是0,表示需要唤醒头节点的下一个节点
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //唤醒下一个节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //下一个节点是空的,`从尾节点往前找`,最接近head的状态小于0的线程节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //指向unpark if (s != null) LockSupport.unpark(s.thread); }
上面为啥head.next是空的,还有从尾巴节点遍历找需要唤醒的节点由于入队 *** 作,先构建尾节点,再将老尾节点指向新节点,next链比prev链延迟构建
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //和尾巴节点 node.prev = t; if (compareAndSetTail(t, node)) { //最后一步才和尾节点关联 t.next = node; return t; } } }}
以上分析的是非公平锁,公平锁与非公平锁释放锁流程一致,
公平锁和非公平锁加锁流程有什么不同?
final void lock() { acquire(1); }
少了非公平锁的快速抢占锁阶段,在正常获取锁之前,加入hasQueuedPredecessors判断,是否有前驱节点,其它步骤一样,
经过排队抢锁。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
//查询是否有任何线程在等待获取比当前线程更长的时间
```java
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
```
Condition的设计思想
类似于Object的wait方法,notify方法,实现线程之间的通信。
在生产者消费者模型的例子里:
生产者线程发现消息队列满了,则调用await方法阻塞,
生产者线程如果生产消息成功,则调用signal方法通知消费者线程进行消费
消费者线程发现消息队列空了,则调用await方法阻塞,
消费者线程消费消息成功,则调用signal方法通知生产者进行生产
ConditionObject 设计了两个节点 头节点 尾节点 维护了一个单向链表,维护等待条件的线程链路。
public class ConditionObject implements Condition { private transient Node firstWaiter; private transient Node lastWaiter; }
Condition的await方法设计思想
当前线程需要通过LockSupport.park方法阻塞,释放cpu资源,并且通过调用release方法释放锁,让其他线程可以获取到锁,并且把当前线程存起来,让其他线程唤醒,在阻塞的方法后面,如果被唤醒,需要重新获取锁资源
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //当前线程加入等待队列 Node node = addConditionWaiter(); //释放锁资源,让其他线程有机会获得锁 int savedState = fullyRelease(node); int interruptMode = 0; //阻塞 while (!isonSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //阻塞被唤醒后再次获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Conditon的signal方法设计思想
需要将第一个等待需要唤醒的线程进行唤醒,并且加入同步队列
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException();//获得第一个等待被唤醒的线程节点 Node first = firstWaiter; if (first != null)//执行唤醒 *** 作 doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将需要被唤醒的节点加入同步队列 Node p = enq(node); int ws = p.waitStatus; //实现唤醒 *** 作 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
针对中断获取锁,直接抛出中断一次
public final void acquireInterruptibly(int arg) throws InterruptedException { //中断状态抛异常 if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //中断状态抛异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 取消获取锁的线程private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; // 设置为取消状态 node.waitStatus = Node.CANCELLED; // 节点在尾处 直接去除 置null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 节点去除 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //唤醒下一个节点 unparkSuccessor(node); } node.next = node; // help GC } }
最后再来一波总结:
Lock和Sychronized在非公平锁的加锁解锁的实现上差异?
1、快速加锁阶段,lock通过cas,设置state变量,再设置独占线程
而sychronized是通过cas设置对象头上线程id.(偏向锁阶段)
2、lock通过 cas一次设置state变量 而sychronized设置锁记录指针到对象头(轻量级别锁)
3、lock 先排队,通过自旋 + cas + park 设置state字段,再设置线程对象。而sychronized通过 自旋 + cas + 排队 + park 设置线程id(重量级锁),lock中只有一个队列用户抢锁排队,而sychronized设计了两个队列,2q算法,减少对一个队列的并发处理。
4、wait/notify和signal/await的设计思想类型,都使用了一个队列存储等待的线程。结合加解锁api来实现线程通信。
ReentrantReadWriteLock可重入读写锁的实现解析
基于ReentrantLock主要改造点
1、读写锁的标志,通过32位整形变量,高16位存读锁标志,低16位存写锁标志。
2、ReentrantReadWriteLock有读和写两把锁,写锁和ReentrantLock实现方式类似,因为他们都是独占锁,而读锁,ReentrantReadWriteLock使用了ThreadLocal对不同线程获取读锁的次数进行隔离,加锁 *** 作,是否锁 *** 作互不影响。
写锁获取源码,和ReentrantLock流程类似
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //锁状态不为0,已经加了读锁或写锁 if (c != 0) { //写锁为0或者线程不是当前线程,其它线程获取了读锁,则本次获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; //锁次数是否会超过上限 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //已经获取了写锁,重入次数自增 // Reentrant acquire setState(c + acquires); return true; } //写锁是否要阻塞,如果是非公平不需要阻塞,公平要阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
//与运算
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 0000 0000 0000 0001 左移16位,再减1//0001 0000 0000 0000 0000//0000 1111 1111 1111 1111 ==写锁的标记//计算写锁的数量static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
写锁释放
和Reentrantlock的释放逻辑一致
修改状态,唤醒下一个线程
读锁获取
//最后一个线程 获取读锁 的计数器private transient HoldCounter cachedHoldCounter;protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //写锁被其它线程占有了,失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); //是否要阻塞 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //第一个读锁线程,专有字段进行记录 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //第一个获取读锁线程,重入 firstReaderHoldCount++; } else { //不是第一个获取读锁线程,不是当前线程 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
### 读锁释放
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //对获取第1个的线程进行判断,释放锁,缓存第一个获取锁的线程和计数,加快释放锁的速度 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { //最后一个获取锁的计数器,缓存,也是为了最后一个获取锁的线程进行快速释放锁 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //释放锁,最后需要修改state状态 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
本文分析了ReentrantLock的设计思想,包括非公平和公平获取锁,释放锁,还有Condition的实现,最后和ReentrantLock的设计进行了分析对比,希望可以加深对锁设计的理解。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)