一、AQS
全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,特点是:用state属性表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
1、getState-获取state状态;
2、setState-设置state状态;
3、compareAndSetState-cas机制设置state状态;
4、独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源;
5、提供了基于FIFO的等待队列,类似于Monitor的EntryList;
6、条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet;
子类主要实现这样一些方法(默认抛出UnsupportedOperationException) tryAcquire tryRelease tryAcquireShared tryReleaseShared isHeldExclusively
基于AQS实现不可重入锁:
package com.concurrent.test; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class TestAqs { public static void main(String[] args) { MyLock lock = new MyLock(); new Thread(() -> { lock.lock(); try{ System.out.println("locking..."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("unlocking..."); lock.unlock(); } }, "t1").start(); new Thread(() -> { lock.lock(); try{ System.out.println("locking..."); }finally { System.out.println("unlocking..."); lock.unlock(); } }, "t2").start(); } } // 自定义锁(不可重入锁) class MyLock implements Lock { // 独占锁 同步器类 class Mysync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0, 1)){ // 加上了锁, 并设置owner为当前线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { // 注意二者顺序,volatile后变量增加写屏障,使它之后的值看到前面的最新值 setExclusiveOwnerThread(null); setState(0); return true; } @Override // 是否持有独占锁 protected boolean isHeldExclusively() { return getState() == 1; } public Condition newCondition(){ return new ConditionObject(); } } private Mysync sync = new Mysync(); @Override // 加锁(不成功会进入等待队列) public void lock() { sync.acquire(1); } @Override // 加锁,可打断 public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override // 尝试加锁(一次) public boolean tryLock() { return sync.tryAcquire(1); } @Override // 尝试加锁,带超时 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override // 解锁 public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
二、ReentrantLock原理
加锁解锁流程
默认非公平实现
public ReentrantLock() { sync = new NonfairSync(); }
NonfairSync继承自AQS
非公平锁加锁解锁
final void lock() { if (compareAndSetState(0, 1)) // 直接竞争锁成功(一次加锁成功) setExclusiveOwnerThread(Thread.currentThread()); else // 加锁失败,走acquire方法 acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 若前驱节点为head节点则处于第二位,则再次尝试是否能够获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } -------------------------------------------------------- // 释放锁 // ReentrantLock.unlock public void unlock() { sync.release(1); } // aqs public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // ReentrantLock protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
没有竞争时
第一个竞争出现时
Thread-1 执行了
1、cas尝试将state由0改为1,结果失败;
2、进入tryAcquire逻辑,这时state已经是1,结果仍然失败;
3、接下来进入addWaiter逻辑,构造Node队列:
- 图中黄色三角表示该Node的waitStatus状态,其中0为默认正常状态;
- Node的创建是懒惰的;
- 其中第一个Node称为Dummy(哑元)或哨兵,用来占位,并不关联线程;
当前线程进入acquireQueued逻辑
1、acquireQueued会在一个死循环中不断尝试获取锁,失败后进入park阻塞;
2、如果自己是紧邻着head(排第二位),那么再次tryAcquire尝试获取锁,当然这时state仍为1,失败;
3、进入shouldParkAfterFailedAcquire逻辑,将前驱node,即head的waitStatus改为-1,这次返回false;
4、shouldParkAfterFailedAcquire执行完毕回到acquireQueued,再次tryAcquire尝试获取锁,当然这时state仍为1,失败;
5、当再次进入shouldParkAfterFailedAcquire时,这时因为其前驱node的waitStatus已经是-1,这次返回true;
6、进入parkAndCheckInterrupt,Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0释放锁,进入tryRelease流程,如果成功 - 设置exclusiveOwnerThread为null;
- state = 0;
当前队列不为null,并且head的waitStatus=-1,进入unparkSuccessor流程:
1、 找到队列中离head最近的一个Node(没取消的),unpark恢复其运行,本例中即为Thread-1;
2、回到Thread-1的acquireQueued流程;
如果加锁成功(没有竞争),会设置 - exclusiveOwnerThread 为Thread-1,state = 1;
- head指向刚刚Thread-1所在的Node,该Node清空Thread;
- 原本的head因为从链表断开,而可被垃圾回收;
如果这时候有其他线程来竞争(非公平的体现),例如这时有Thread-4来了
如果不巧又被Thread-4占了先 - Thread-4被设置为exclusiveOwnerThread,state = 1;
- Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞;
可重入原理
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁,线程还是当前线程,表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入,只有state减为0,才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
可打断原理
不可打断模式:
在此模式下,即使它被打断,仍会驻留在AQS队列中,等获得锁后方能继续运行(是继续运行!只是打断标记被设置为true)。
private final boolean parkAndCheckInterrupt() { // 如果打断标记已经是true, 则park会失效 LockSupport.park(this); // interrupted会清楚打断标记(这里保证一个线程可以多次park在这里) return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 还是需要获得锁后,才能返回打断状态(因此,在没获取到锁之前,打断是无效的,会多次进入parkAndCheckInterrupt中park) return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 如果是因为interrupt被唤醒, 返回打断状态为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 重新产生一次中断 selfInterrupt(); }
可打断模式:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 如果没有获得到锁, 进入(一) if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // (一) 可打断的获取锁流程 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 在 park 过程中如果被 interrupt 会进入此 // 这时候抛出异常,而不会再次进入for(;;) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
因此,不可打断只重置打断标记,没获取到锁钱,会再次进入死循环park,所以打断无效,而可打断锁通过抛出异常,不用再进入死循环,从而实现可打断。
公平锁实现原理
非公平锁实现
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果还没有获得锁 if (c == 0) { // 尝试用cas获得,这里体现了非公平性:不去检查AQS队列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁,线程还是当前线程,表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 获取失败,返回调用处 return false; }
公平锁实现
// 与非公平锁主要区别在于tryAcquire方法的实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 先检查AQS队列中是否有前驱节点,没有才去竞争 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 队列中是否有节点 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // h != t 时表示队列中有Node return h != t && // (s = h.next) == null表示队列中还没有老二 ((s = h.next) == null || // 或者队列中老二线程不是此线程 s.thread != Thread.currentThread()); }
条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject
await流程
开始Thread-0持有锁,调用await,进入ConditionObject 的 addConditionWaiter流程,创建新的Node状态为 -2 (Node.CONDITION),关联Thread-0,加入等待队列尾部
接下来进入AQS的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 进入ConditionObject 的 addConditionWaiter流程,创建新的Node状态为 -2 (Node.CONDITION),关联Thread-0,加入等待队列尾部 Node node = addConditionWaiter(); // 释放锁 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 当前线程进入park, 等待被唤醒 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 可重入锁一次释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
signal流程
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal流程,取得等待队列中第一个 Node,即 Thread-0 所在Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的waitStatus 改为0,Thread-3 的waitStatus 改为-1
Thread-1 释放锁,进入 unlock 流程
public final void signal() { // 当前线程是否持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 取conditionObject中第一个节点 Node first = firstWaiter; if (first != null) // 执行doSignal方法 doSignal(first); } private void doSignal(Node first) { do { // 若队列中只有一个节点,将尾节点也置为空 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 节点转移成功,则退出, 转移失败且队列不为空, Node后移, 继续尝试 } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // 将当前节点的状态由 -2 改为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将当前节点加入 aqs 队列尾部, 并返回当前节点的前驱节点 Node p = enq(node); int ws = p.waitStatus; // 若前驱节点waitStatus > 0 或 前驱节点waitStatus修改为-1成功, 则返回成功 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 否则, 进入unpark流程 LockSupport.unpark(node.thread); return true; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)