Juc并发编程06——深入剖析队列同步器AQS源码

Juc并发编程06——深入剖析队列同步器AQS源码,第1张

我们看下Reentrantock的源码。

   public void lock() {
        sync.lock();
    }

   public void unlock() {
        sync.release(1);
    }

原来lock,unlock等核心方法都是通过sync来实现的。而sync其实是它的一个内部类。

abstract static class Sync extends AbstractQueuedSynchronizer {...}

这个内部类继承了AbstractQueuedSynchronizer,也就是我们今天要重点介绍的队列同步器AQS。它其实就是我们锁机制的基础,它封装了包括锁的获取、释放以及等待队列。

线程的调度其关键就在于等待队列,其数据结构就是双向链表,可以参考下图。

我们先来了解以下每个Node有什么内容,点开AQS的源码,它定义了内部类Node

static final class Node {
        // 每个节点分为独占模式和共享模式、分别适用于独占锁和共享锁    
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

        // 定义等待状态
        // CANCELLED:唯一一个数值大与0的状态,说明此节点已经被取消
        static final int CANCELLED =  1;
        // 此节点后面的节点被挂起,进入等待状态
        static final int SIGNAL    = -1;
        // 在条件队列中的状态
        static final int CONDITION = -2;
        // 传播,一般用于共享锁
        static final int PROPAGATE = -3;

        volatile int waitStatus; //等待状态值
        volatile Node prev; //双向链表基本 *** 作
        volatile Node next;
        volatile Thread thread; //每一个线程可以封装到一个节点进入等待队列

        Node nextWaiter; //在等待队列中表示模式,在条件队列中表示下一个节点

       // 判断是否为共享节点
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

       // 返回前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {   // 初始化建立节点或共享标记(Used to establish initial head or SHARED marker)
        }

        Node(Thread thread, Node mode) {     // 等待队列使用
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // 条件对立使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

再跳出NodeAQS,它定义了三个属性,head,tail默认为null,state默认为0,并且AQS的构造器并没有给它们赋值。

    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state; // 当前锁的状态

实际上,双向链表初始化是在实际使用时完成的,后面将演示使用。看看其中一个置状态的 *** 作。

  protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

原来是通过unsafecompareAndSwapInt()实现的。这个是CAS算法。不妨看看unsafe,它也是内部的属性。

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

	// 找到各个属性锁在的内存地址(相对于unsafe类的偏移地址)
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     * CAS  *** 作头节点
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS  *** 作尾节点
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * CAS  *** 作WaitStatus属性
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /**
     * CAS  *** 作next属性 
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

其实,Unsafe里面调用的都是native方法,读者可以自己点进去看看。它会直接找到属性的内存地址, *** 作内存中的数据,效率比较高。在AQS中静态块计算了各个属性的相对于类的偏移地址,并且在调用Unsafe中的方法时会将偏移地址传过去哦。

并且我们回过头看看,这里unsafe *** 作的属性在被定义时都是定义为volatile修饰,这是因为他们在被修改时都是使用的CAS算法,我们要使用vilotile修饰保证其可见性。


    private volatile int state; // 当前锁的状态

现在我们已经大致了解了AQS的底层机制,接着来看看它到底时如何被使用的。先看看它可以被重写的五个方法吧。

    // 独占式获取同步状态,查看同步状态是否与参数一致,如果没有问题则通过CAS设置同步状态并返回true
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

   // 独占式释放同步状态
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

   // 共享式获取同步状态,返回值大于0表示成功,否则失败
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

   // 共享式释放同步状态
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

   // 是否在独占模式下被当前线程占有(是否当前线程持有锁)
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

现在我们以ReentantLock的公平锁为例,看看它怎么被重写的。

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
        ...
}

先看看lock方法。调AQSacquire()方法。里面会调用AQS中定义的tryAquire方法。而在ReentrantLocktryAuire方法公平锁与非公平锁的实现不同,其具体内容我们暂时略过。这里使用短路&&运算,如果拿到锁了,就不会走后面的逻辑。否则会调用acquireQueued,其内部调用了addWaiter。这就是说如果其它线程持有锁,就会把当前节点加入等待队列中。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式,EXCLUSIVE
            selfInterrupt();
    }

跟到addWaiter中看看。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先尝试用CAS直接入队,如果CAS入队成功,则return
        Node pred = tail;
        if (pred != null) { //初始状态tail尾节点未赋值会指null,如果不为空说明有其它节点插入了
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // CAS失败(其它线程也在获取锁或者tail节点为空无法cas)
        enq(node);
        return node;
    }

上面的注释写的很清楚,我们接着看看enq怎么实现的,它其实是AQS的一个自旋机制。

   private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 说明头尾节点没有初始化
                if (compareAndSetHead(new Node())) //新建空节点置为头节点
                    tail = head; //头尾节点指向同一个节点
            } else {
                node.prev = t; //队列插入节点 *** 作,把当前节点的prev指向尾节点
                if (compareAndSetTail(t, node)) { //设置队列的尾节点为刚插入的当前节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter终于看完了,再退回去看下,它的结果会作为参数传给acquireQueued,我们接着来看下acquireQueued。它在得到返回的节点时也会进入自旋状态(入等待队列成功,准备排队获取锁了)。

其过程可以结合下图理解。
具体代码如下。

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 成功与否标记,初始置为true
        try {
            boolean interrupted = false; //中断标记
            for (;;) {
                final Node p = node.predecessor(); //获取当前已经插入节点的前驱节点
                if (p == head && tryAcquire(arg)) { //如果前驱节点是头节点,说明当前节点位于队首(上图节点1),会调用tryAcquire抢锁
                    setHead(node); //抢锁成功,节点出队
                    p.next = null; // 建议 GC
                    failed = false;
                    return interrupted; //正常返回,未在等待队列中被中断
                }
                // 当前节点不是队首节点,将当前节点的前驱节点的等待状态设置为signal(siganl含义:siganl状态的节点下一个节点处于等锁状态)。如果设置失败进行下一轮循环,否则往下执行
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 通过Unsafe类 *** 作底层挂起线程(直接进入阻塞状态,也就是等待锁的状态)
        return Thread.interrupted();
    }

再来看看shouldParkAfterFailedAcquire的具体逻辑。

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 如果前驱节点已经是signal,则直接返回true
            return true;
        if (ws > 0) { // ws>0表示前驱节点已经被取消,不能是被取消的节点,向前遍历直到找到第一个未被取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node; //把被取消的节点全部抛弃
        } else {
        	   //前驱节点不是signal, 使用CAS设置为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false; // 返回false,直接进入下一轮,判断CAS是否成功将前驱节点状态设置为signal
    }

在上面的代码分析过程中,我们频繁看到park,unpark方法,他们的作用是将线程挂起,和解除线程的挂起状态。看下列示例代码。

public class Demo22 {
    public static void main(String[] args) {
        Thread t = Thread.currentThread();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("unpark thread t");
                LockSupport.unpark(t);
//                thread.interrupt();
           } catch (InterruptedException e) {

            }
        }).start();
        System.out.println("Thread t to be park...");
        LockSupport.park();
        System.out.println("Thread t unpark successfully");
    }
}

其运行结果是。

Thread t to be park...
unpark thread t
Thread t unpark successfully

到此为止,ReentrantLock公平锁的Lock方法已经讲解完毕了。继续深入。我们接着来看它的tryAcquire方法。也就是看看它究竟是如何去抢锁的。

  // 可重入独占锁的公平实现
  protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState(); // 获取当前AQS的状态,独占模式下如果为0说明未占用,如果大于0说明已经被占用
            if (c == 0) {
                if (!hasQueuedPredecessors() && //判断是否等待队列不为空且当前线程没有获得锁,其实就是当前线程是否需要排队
                    compareAndSetState(0, acquires)) { // CAS设置状态,如果成功则说明成功的获取到了这把锁
                    setExclusiveOwnerThread(current); // 将独占锁的线程拥有者设置为当前线程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 如果AQS状态不为0,说明锁被占用,判断占用者是否是当前线程
                int nextc = c + acquires; // 每加锁一次则状态值加一
                if (nextc < 0) //加到int溢出了
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); 
                return true;
            }
            return false;  // 任何其它情况返回false,加锁失败
        }
    }

加锁过程算是讲完了,接下来看看它的解锁过程。

ReentrantLockunlock方法。原来调用的是release方法,状态传参1,因为释放锁的次数为1.

    public void unlock() {
        sync.release(1);
    }

看看AQSrelease方法。

    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 尝试解锁
            Node h = head; 
            if (h != null && h.waitStatus != 0) //头节点不为空且waitStatus状态不为0(初始状态为0,当被设置成为signal后为-1)
                unparkSuccessor(h); // 唤醒下一个后继节点
            return true;
        }
        return false;
    }

接下来看看unparkSuccessor.看看它到底是怎么唤醒下一个节点的。

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) // 如果等待状态<0,说明是signal状态,将其设置为0,即恢复状态
            compareAndSetWaitStatus(node, ws, 0); 
        // 获取当前节点的后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) { //如果没有下一个节点或者状态>0(已经取消),遍历节点找其它符合unpark要求的节点
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev) //从队尾往队前找
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) //要是没有找到,就算了,找到了就unpark
            LockSupport.unpark(s.thread);
    }

再看看release中的tryRelease

       protected final boolean tryRelease(int releases) {
            int c = getState() - releases; //当前状态值减去要释放锁的次数(之前传的是1)
            if (Thread.currentThread() != getExclusiveOwnerThread()) // 独占锁,如果不是当前线程持有锁,抛出异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { //解锁后状态值为0,则完全释放这把锁
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c); //状态值
            return free; // 是否完全释放
        }

下面通过一张图对锁的加锁、释放机制做一个总结。非公平锁的情况大致进行介绍如下。压根没啥等待队列,上来直接哐哐CAS.

      final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/731100.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存