AQS原理解析

AQS原理解析,第1张

AQS原理解析 一、CAS原理 1、简介
CAS 全称是 compare and swap,是一种用于在多线程环境下实现同步功能的机制。CAS  *** 作包含三个 *** 作数 -- 内存位置、预期数值和新值。
CAS 的实现逻辑是将内存位置处的数值与预期数值想比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何 *** 作。
具体实现,public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
这个是一个native方法, 第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值
整个方法的作用是如果当前时刻的值等于预期值var4相等,则更新为新的期望值 var5,如果更新成功,则返回true,否则返回false;
Unsafe
Unsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;
Unsafe可认为是Java中留下的后门,提供了一些低层次 *** 作,如直接内存访问、线程调度等
2、问题

a、ABA问题,就是要维护的变量被替换后,又设置回来。类实例将无法辨别它被替换过。 举个例子,假设有一个变量x:

  1. 线程1试图用cas把x从A设置为C,所以它先查询x的值。(在这瞬间,线程切换)
  2. 线程2用cas把x设置为B
  3. 线程2用cas把x设置为A
  4. (线程切换回来)线程1查询到x的值为A,于是cas理所当然地把x改为了C。

问题是:线程1在查询x的过程中,x的值已经经历了A->B→A的转变,而线程1对此无所知。这就是ABA问题了。

解决方案:

对于ABA问题,比较有效的方案是引入版本号,内存中的值每发生一次变化,版本号都+1;在进行CAS *** 作时,不仅比较内存中的值,也会比较版本号,只有当二者都没有变化时,CAS才能执行成功。AtomicStampedReference类便是使用版本号来解决ABA问题的。

AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境。

b、性能问题

   我们使用时大部分时间使用的是 while true 方式对数据的修改,直到成功为止。优势就是相应极快,但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。

二、锁 1、锁的类型

锁从宏观上分类,分为悲观锁与乐观锁。重量级锁是悲观锁的一种,自旋锁、轻量级锁与偏向锁属于乐观锁

1.1 乐观锁/悲观锁

乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁 *** 作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的 *** 作。

java中的乐观锁基本都是通过CAS *** 作实现的,CAS是一种更新的原子 *** 作,比较当前值跟传入值是否一样,一样则更新,否则失败。 

悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会block直到拿到锁。java中的悲观锁就是Synchronized,AQS框架下的锁则是先尝试cas乐观锁去获取锁,获取不到,才会转换为悲观锁,如RetreenLock。

1.2 独享锁/共享锁

       独享锁是指该锁一次只能被一个线程所持有。

  共享锁是指该锁可被多个线程所持有。

  对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。

  读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。

  独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。

  对于Synchronized而言,当然是独享锁。

1.3 互斥锁/读写锁

  上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。

  互斥锁在Java中的具体实现就是ReentrantLock。

  读写锁在Java中的具体实现就是ReadWriteLock。

1.4 可重入锁

  可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。

  对于Java ReetrantLock而言,从名字就可以看出是一个重入锁,其名字是Re entrant Lock 重新进入锁。

  对于Synchronized而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。

1.5 公平锁/非公平锁

  公平锁是指多个线程按照申请锁的顺序来获取锁。

  非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。

  对于Java ReetrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。

  对于Synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。

1.6 分段锁

  分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发 *** 作。

  我们以ConcurrentHashMap来说一下分段锁的含义以及设计思想,ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7和JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。

  当需要put元素的时候,并不是对整个hashmap进行加锁,而是先通过hashcode来知道他要放在哪一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入。

  但是,在统计size的时候,可就是获取hashmap全局信息的时候,就需要获取所有的分段锁才能统计。

  分段锁的设计目的是细化锁的粒度,当 *** 作不需要更新整个数组的时候,就仅仅针对数组中的一项进行加锁 *** 作。

1.7 偏向锁/轻量级锁/重量级锁

  这三种锁是指锁的状态,并且是针对Synchronized。在Java 5通过引入锁升级的机制来实现高效Synchronized。

  偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。

  轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

  重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让他申请的线程进入阻塞,性能降低。

各种锁的优缺点及适用场景

优点

缺点

适用场景

偏向锁加锁和解锁不需要额外的消耗,与执行非同步方法仅存在纳秒级的差距如果线程间存在竞争,会带来额外的锁撤销的消耗适用于只有一个线程访问同步块的情况轻量级锁竞争的线程不会堵塞,提高了程序的响应速度始终得不到锁的线程,使用自旋会消耗CPU追求响应时间,同步块执行速度非常块,只有两个线程竞争锁重量级锁线程竞争不使用自旋,不会消耗CPU线程堵塞,响应时间缓慢追求吞吐量,同步块执行速度比较慢,竞争锁的线程大于2个

1.8 自旋锁

  在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。

三、AQS解析 1、前置资料

当state=0时,表示无锁状态

当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁

AQS的Node中有每个Node自己的状态(waitStatus):

SIGNAL(-1) :前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己

CANCELLED(1):因为超时或中断,该线程已经被取消

ConDITION(-2):表明该线程被处于条件队列,就是因为调用了>- Condition.await而被阻塞

PROPAGATE(-3):传播共享锁


2、ReentrantLock

Sync这个类有两个具体的实现,分别是NofairSync(非公平锁),FailSync(公平锁).

 

 

ReentrantLock 主要是基于非公平锁的独占锁实现。在获得同步锁时,同步器维护一个FIFO双向同步(CLH)队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;

 移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

2.1 NonfairSync.lock

final void lock() {

    if (compareAndSetState(0, 1)) //通过cas *** 作来修改state状态,表示争抢锁的 *** 作

      setExclusiveOwnerThread(Thread.currentThread());//设置当前获得锁状态的线程

    else

      acquire(1); //尝试去获取锁

}


2.2 AbstractQueuedSynchronizer.acquire

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&  //通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  //如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部

                                                            //acquireQueued Node作为参数,通过自旋去尝试获取锁

            selfInterrupt();

    }


2.3 NonfairSync.tryAcquire

final boolean nonfairTryAcquire(int acquires) {

    final Thread current = Thread.currentThread();

    int c = getState();

    if (c == 0) { //state=0说明当前是无锁状态

        //通过cas *** 作来替换state的值改为1

        if (compareAndSetState(0, acquires)) {

             //保存当前获得锁的线程

            setExclusiveOwnerThread(current);

            return true;

        }

    }

    //如果是同一个线程来获得锁,则直接增加重入次数

    else if (current == getExclusiveOwnerThread()) {

        int nextc = c + acquires; //增加重入次数

        if (nextc < 0) // overflow

            throw new Error("Maximum lock count exceeded");

        setState(nextc);

        return true;

    }

    return false;

}

2.4 AbstractQueuedSynchronizer.addWaiter

private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE

        //将当前线程封装成Node,并且mode为独占锁

        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure

        // tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法

        Node pred = tail;

        if (pred != null) { //tail不为空的情况,说明队列中存在节点数据

            node.prev = pred;  //1.将当前线程的Node的prev节点指向tail 

            if (compareAndSetTail(pred, node)) {//2.通过cas讲node添加到AQS队列

                pred.next = node;//3.cas成功,把旧的tail的next指针指向新的tail

                return node;

            }

        }

        enq(node); //tail=null,将node添加到同步队列中

        return node;

    }

  

 

2.5 AbstractQueuedSynchronizer.enq

private Node enq(final Node node) {

        //自旋

        for (;;) {

            Node t = tail; //如果是第一次添加到队列,那么tail=null

            if (t == null) { // Must initialize

                //CAS的方式创建一个空的Node作为头结点

                if (compareAndSetHead(new Node()))

                   //此时队列中只一个头结点,所以tail也指向它

                    tail = head;

            } else { 

                node.prev = t;//1. 进行第二次循环时,tail不为null。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node

                if (compareAndSetTail(t, node)) {  //2. t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点

                    t.next = node; //3. t目前指向了头结点,将头结点的后续结点指向Node,返回头结点

                    return t;

                }

            }

        }

    }

  

 

2.6 AbstractQueuedSynchronizer.acquireQueued

将添加到队列中的Node作为参数传入acquireQueued方法,这里面会做抢占锁的 *** 作

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException

            if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺

                setHead(node); // 获取锁成功后就不需要再进行同步 *** 作了,获取锁成功的线程作为新的head节点

                //凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null

                p.next = null; // help GC 移除原来的初始化head节点

                failed = false; //获取锁成功

                return interrupted;

            }

            //如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())// 阻塞 *** 作,正常情况下,获取不到锁,代码就在该方法停止了,直到被唤醒

                interrupted = true;

        }

    } finally {

        if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue) *** 作

            cancelAcquire(node);

    }

}

2.7 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire 

  靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    int ws = pred.waitStatus; //前继节点的状态

    if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被park

               return true;

如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个 *** 作实际是把队列中CANCELLED的节点剔除掉。

    if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。

        

        do {

            node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

    } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。

        

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    return false;

}


2.7 Sync.tryrelease

protected final boolean tryRelease(int releases) {

    int c = getState() - releases; // 这里是将锁的数量减1

    if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常

        throw new IllegalMonitorStateException();

    boolean free = false;

    if (c == 0) {

    // 由于重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,才会把当前线程释放

        free = true;

        setExclusiveOwnerThread(null);

    }

    setState(c);

    return free;

}

2.8 AbstractQueuedSynchronizer.unparkSuccessor

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;

    if (ws < 0)

        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;

    if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态,

        s = null;

        for (Node t = tail; t != null && t != node; t = t.prev)

            if (t.waitStatus <= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点,为什么向前遍历?

                s = t;

    }

//内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁

    if (s != null)

        LockSupport.unpark(s.thread); //释放许可

}

简要总结一下AQS的流程的一些特性:
    • 关键获取锁、释放锁 *** 作由AQS子类实现:acquire-release、acquireShared-releaseShared;
    • 维护了一个FIFO链表结构的队列,通过自旋方式将新结点添加到队尾;
    • 添加结点时会从前驱结点向前遍历,跳过那些处于CANCELLED状态的结点;
    • 释放结点时会从队尾向前遍历,踢出CANCELLED状态的结点,然后唤醒后继结点;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4668461.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存