AQS(AbstractQueuedSynchronizer)——源码分析

AQS(AbstractQueuedSynchronizer)——源码分析,第1张

AQS(AbstractQueuedSynchronizer)——源码分析

同步器提供的模板方法主要分为3类:

  1. 独占式获取与释放同步状态

  2. 共享式获取与释放同步状态

  3. 查询同步队列中的等待线程情况

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))

selfInterrupt();

}

public final void acquireInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (!tryAcquire(arg))

doAcquireInterruptibly(arg);

}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

return tryAcquire(arg) ||

doAcquireNanos(arg, nanosTimeout);

}

public final void acquireShared(int arg) {

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

return tryAcquireShared(arg) >= 0 ||

doAcquireSharedNanos(arg, nanosTimeout);

}

public final boolean release(int arg) {

if (tryRelease(arg)) {

java.util.concurrent.locks.AbstractQueuedSynchronizer.Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

public final Collection getQueuedThreads() {

ArrayList list = new ArrayList();

for (java.util.concurrent.locks.AbstractQueuedSynchronizer.Node p = tail; p != null; p = p.prev) {

Thread t = p.thread;

if (t != null)

list.add(t);

}

return list;

}

总结​

| 方法名称 | 描述 |

| — | — |

| void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步等待队列等待,该方法将会调用重写的tryAcquire(int arg)方法 |

| void acquireInterruptibly(int arg) | 与acquire方法相同,但是该方法可以响应中断,当前线程未获取到同步状态进入同步队列中,如果当前线程被中断,则该方法抛出InterruptedException异常 |

| tryAcquireNanos(int arg, long nanosTimeout) | 在acquireInterruptibly方法的基础上增加了超时限制,如果当前线程在超时时间内未获取到同步状态,则返回false,获取到则返回true |

| acquireShared(int arg) | 共享式额获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以由多个线程同时获取到同步状态 |

| void acquireSharedInterruptibly(int arg) | 与acquireShared方法相同,该方法响应中断 |

| tryAcquireSharedNanos(int arg, long nanosTimeout) | 在acquireSharedInterruptibly的基础上增加了超时限制 |

| boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 |

|

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

boolean releaseShared(int arg) | 共享式的释放同步状态 |

| Collection getQueuedThreads() | 获取等待在同步队列上的线程集合 |

2.2 自定义独占锁加强AbstractQueuedSynchronizer的工作原理的理解

独占锁指的是,同一时刻只能有一个线程获取到锁,其他获取锁的线程只能处于等待队列中等待,只有获取到锁的线程释放了锁,后继的线程才能获取到锁。(不太了解的可以写一遍,基本上就懂了)

package com.lizba.p5;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {

private static class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

// 当前状态如果为0则获取到锁

if (compareAndSetState(0, 1)) {

// 设置锁的占用线程为当线程

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

@Override

protected boolean tryRelease(int arg) {

// 如果当前同步状态为0,调用该方法则抛出异常

if (getState() == 0) {

throw new IllegalMonitorStateException();

}

// 清空占用线程

setExclusiveOwnerThread(null);

// 设置共享状态为0

setState(0);

return true;

}

@Override

protected boolean isHeldExclusively() {

return getState() == 1;

}

Condition condition() {

return new ConditionObject();

}

}

// 将需要的 *** 作代理至Sync上

private Sync sync = new Sync();

@Override

public void lock() {

sync.acquire(1);

}

@Override

public boolean tryLock() {

return sync.tryAcquire(1);

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

return sync.tryAcquireNanos(1, unit.tonanos(time));

}

@Override

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}

@Override

public void unlock() {

sync.release(1);

}

@Override

public Condition newCondition() {

return sync.condition();

}

public boolean hasQueuedThreads() {

return sync.hasQueuedThreads();

}

}

总结自定义同步组件Mutex互斥锁:

通过自定义同步组件Mutex我们可以看出。Mutex定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。用户在使用Mutex的时候并不会直接和内部同步器打交道,而是调用Mutex提供的方法,在Mutex的实现中以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可。这种实现方法大大降低了实现一个可靠自定义同步组件的门槛。(不多说_Doug Lea牛逼_)

3、AbstractQueuedSynchronizer实现分析

分析的主要内容包括如下几个方面

  1. 同步队列

  2. 独占式同步状态获取与释放

  3. 共享式同步状态获取与释放

  4. 超时获取同步状态

3.1 同步队列

同步队列实现依赖的是内部的一个(FIFO)的同步队列来完成同步状态管理的,而这个队列的重中之重就是AbstractQueuedSynchronizer的内部类Node,这个Node节点是用来保存同步失败的线程引用、等待状态以及前驱prev和后继next节点。

节点源码重点部分解释:

static final class Node {

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

}

在AbstractQueuedSynchronizer的内部类代表同步队列中的节点,而AbstractQueuedSynchronizer会持有首节点(head)和尾节点(tail),获取同步状态失败的节点加入队列的尾部。

private transient volatile Node head;

private transient volatile Node tail;

通过一组图来查看AQS的结构

AQS同步队列的基本结构

  1. 同步器持有首节点和尾节点,初始都为null

  2. 获取同步状态失败的线程节点加入队列尾部

AQS同步队列加入节点

  1. 通过compareAndSetTail(Node expect, Node update)提供的CAS *** 作来正确的设置尾节点

  2. 加入尾节点成功后,将原先尾节点的后继节点指向新的尾节点,新的尾节点的前驱节点设置为原尾节点

AQS同步队列设置首节点

  1. 同步队列节点顺序出入队遵循FIFO

  2. 原先首节点释放同步状态,唤醒后继节点,后继节点获取同步状态成功后设置自己为首节点

3.2 独占式同步状态获取与释放

独占式同步状态获取通过调用同步器的acquire(int arg)获取同步状态,注意该方法不响应中断。

获取解析

acquire(int arg)源码解析

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

acquire(int arg)之addWaiter(Node mode)

private Node addWaiter(Node mode) {

// 创建一个新的节点,设置节点信息,和节点线程为当前线程

Node node = new Node(Thread.currentThread(), mode);

// 获取尾节点,当尾节点不为空的时候,尝试设置尾节点

Node pred = tail;

if (pred != null) {

// 设置当前插入节点的前驱节点为当前同步队列中的尾节点

node.prev = pred;

// 通过CAS快速设置尾节点为当前插入的节点

if (compareAndSetTail(pred, node)) {

// 如设置尾节点成功,将pred节点(同步队列上一个的尾节点,此时新的尾节点为插入节点)的后继节点指向新插入的节点(新的尾节点)

pred.next = node;

// 返回节点对象

return node;

}

}

enq(node);

return node;

}

addWaiter(Node mode)之 enq(node)

private Node enq(final Node node) {

// 不断循环,直至CAS插入节点成功

for (; {

Node t = tail;

if (t == null) {

// 当尾节点为null,此时需要初始化头节点和尾节点

if (compareAndSetHead(new Node()))

tail = head;

} else {

// 插入节点前驱节点指向原先尾节点

node.prev = t;

// CAS插入至同步队列的尾节点

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

acquire(int arg)之 acquireQueued(final Node node, int arg)

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

// 不断循环

for (; {

// 获取当前节点的前驱节点,如果前驱节点为null将会抛出空指针异常

final Node p = node.predecessor();

// 如果当前节点的前驱节点是头节点,尝试获取同步状态

if (p == head && tryAcquire(arg)) {

// 设置当前节点为头节点,并且将节点线程和节点的前驱节点置为null,help GC

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

// 如果不符合条件,则判断当前节点前驱节点的waitStatus状态来决定是否需要挂起LockSupport.park(this);

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

// 失败则取消

if (failed)

cancelAcquire(node);

}

}

为何当且仅当当前节点的前驱节点是头节点才能尝试获取同步状态?

我们先看一张节点自旋获取同步状态的图,再总结原因

  1. 头节点是成功获取同步状态的节点,头节点线程释放同步状态之后会唤醒后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点

  2. 保持并维护同步队列FIFO的出队入队原则

acquire(int arg) 执行时序图

3.2 release

当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,执行响应逻辑执行,需要释放同步状态,是的后续节点能够继续获取同步状态。释放同步状态调用的是release(int arg) 方法,该方法释放同步状态之后,会唤醒其他后继节点。

源码解析:

public final boolean release(int arg) {

// 尝试释放同步状态

if (tryRelease(arg)) {

Node h = head;

// 如果头节点不为空,并且头节点的等待状态waitStatus不为初始状态0,此时判断为仍存在后继节点

if (h != null && h.waitStatus != 0)

// 使用LockSupport来唤醒处于等待状态的线程

unparkSuccessor(h);

return true;

}

return false;

}

3.3 独占式同步状态获取与释放总结
  1. 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并且在队列中进行自旋

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5693034.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存