精通Java并发编程N+2:万字梳理读写锁ReentrantReadWriteLock原理源码分析

精通Java并发编程N+2:万字梳理读写锁ReentrantReadWriteLock原理源码分析,第1张

文章目录
  • 一、ReentrantReadWriteLock实现原理
    • 1、概述
      • 1)面试题2:为什么要使用读写锁?
      • 2)面试题3:为什么读写锁中读锁和写锁采用一个变量来标识?
      • 3)面试题4:读写锁如何保证写线程不饿死?
      • 4)如何使用读写锁?
    • 2、源码分析原理
      • 1)面试题1:读写锁如何实现的(实现原理)?
      • 2)成员变量/构造函数
        • 1> 内部类Sync
        • 1> 面试题5:为什么实例化Sync时要获取到state变量之后再setState()?
        • 3> 面试题6:如何知道当前线程获取到了多少次读锁?
        • 4> 面试题7:假如全程只有一个线程获取读锁,还有必要使用ThreadLocal维护线程拥有的读锁数量吗?
        • 4> getReadHoldCount()
      • 3)写锁
        • 1> 写锁的获取 WriteLock#lock()
        • 1.1> writerShouldBlock()
        • 2> 写锁的释放 WriteLock#unlock()
      • 4)读锁
        • 1> 读锁的获取 ReadLock#lock()
        • 1.1> fullTryAcquireShared()
        • 1.2> readerShouldBlock()
        • 2> 读锁的释放 ReadLock#unlock()
      • 5)锁升级、锁降级
        • 1>锁升级
        • 2> **锁降级**:
        • 3> 锁降级的作用?

一、ReentrantReadWriteLock实现原理 1、概述 1)面试题2:为什么要使用读写锁?

独占锁:在独占锁模式下, 每次只能有一个线程能持有锁,ReentrantLock就是以独占方式实现的互斥锁。它是一种悲观保守的加锁策略,避免了读/读冲突,只允许一个线程读,其他线程就要等待。

缺点: 它限制了不必要的并发性,因为读 *** 作并不会影响数据的一致性。

共享锁:

是一种乐观锁,它放宽了加锁策略,允许多个线程同时访问共享资源。

我们一般认为锁都是独占锁。但有些时候不用加锁(比如多个线程同时读一个变量)。

  1. 本身读 *** 作是不用加锁的,但如果一个线程读的时候,其他线程在写,就会出现误读问题。所以为了避免误读问题,不允许读的时候同时进行写,所以要加锁。
  2. 但是如果读 加互斥锁的话就很浪费并发性能,所以给读加共享锁,使读读之间可以并发。

使用场景?

  • 读并发多,写并发少的场景
2)面试题3:为什么读写锁中读锁和写锁采用一个变量来标识?

因为如果拆分成两个,判断写锁和读锁会拆分两步、CAS设置写锁和读锁也是两步,无法保证加锁、获取锁的原子性 *** 作,会有线程安全问题。

现在采用一个32位的int变量,高16位表示读锁16位表示写锁;由于只需要 *** 作一个变量,所以只需要一次CAS即可保证CAS设置写锁、读锁的原子性。

3)面试题4:读写锁如何保证写线程不饿死?

在获取读锁之前,会先判断当前有没有写线程在排队。针对公平锁和非公平锁实现方式略有差别;

非公平锁中:

  • 会判断AQS阻塞队列中等待的第一个线程是不是写线程。

公平锁中:

  • 会判断AQS阻塞队列的前面有没有线程在排队。
4)如何使用读写锁?

ReentrantReadWriteLock(适用于多读一写)

ReadWriteLock lock = new ReetrantReadWriteLock();

读 *** 作lock.readLock().lock(); lock.readLock().unlock();

写 *** 作lock.writeLock().lock(); lock.writeLock().unlock();

如果写 *** 作不释放锁,别的 *** 作就没法进行。

2、源码分析原理

ReentrantReadWriteLock本质上也是依赖于AQS同步框架实现的一个锁,所以有很多核心逻辑是依赖AQS实现的,关于AQS的源码分析见博文:https://blog.csdn.net/Saintmm/article/details/124308023

1)面试题1:读写锁如何实现的(实现原理)?

ReentranReadWriteLock将锁分割为两把锁:读锁、写锁;用于读写互斥区中保护的变量。

  • ReentrantReadWriteLock的核心是由一个基于AQS的同步器Sync,和其扩展出的两把锁:ReadLock、WriteLock所组成;

  • Sync表示读锁还是写锁是从AQS中volatile关键字修饰的state变量看出的,state字段被分为高16位和低16位,高16位表示读锁个数,低16位表示写锁个数;

  • 读写锁也有公平锁和非公平锁之分,主要针对写线程而言,代码中体现在writerShouldBlock()方法;

    • 非公平锁:直接返回FALSE,上来就直接去抢锁,不阻塞。
    • 公平锁:调用AQS的hasQueuedPredecessors()方法判断AQS的阻塞队列里是否有其他线程在排队,如果有则返回TRUE,遵循FIFO规则阻塞写线程。

    而读线程的公平非公平在于获取读锁时,防止写线程饥饿而阻塞读线程的时机,源码中体现在readerShouldBlock()方法;

    • 非公平锁:判断AQS阻塞队列的头结点的下一个节点是不是写线程,如果是,则返回TRUE,表示读线程需要阻塞。
    • 公平锁:调用AQS的hasQueuedPredecessors()方法判断AQS的阻塞队列里是否有其他线程在排队,如果有则返回TRUE,表示读线程需要阻塞。

1> 在Sync实例化时:

有两个点比较有意思:

1> setState(getState()); --> 获取到state变量之后再setState()。

  • 这样做是为了保证每个线程持有读锁数量readHolds的可见性。
    • 首先获取当前state值,然后再将获取到的state值赋值给state;
    • state变量是被volatile修饰的,对一个volatile变量的写 *** 作,会保证其前面对普通变量的写 *** 作对volatile写 *** 作而言是可见的。
    • 即:当第二个 *** 作是volatile写时,无论第一个 *** 作是什么,都不会发生指令重排序

2> 针对第一个获取读锁的线程,采用特殊方式保存其获取读锁的数量:

  • state变量的高16位表示读锁的总重入个数,使用ThreadLocalHoldCounter类型的变量readHolds来记录每个线程获取到了多少次共享锁;
  • 然而ThreadLocal是占用内存的,如果全程只有一个线程获取读锁,是没有必要使用ThreadLocal的;
  • 所以读写锁对 获取线程拥有的读锁数量做了一个优化,如果当前线程是在并发获取读锁时第一个获取到读锁的线程,会采用一个volatile关键字修饰的int类型变量firstReaderHoldCount专门记录当前线程拥有的读锁数量;firstReader记录持有锁的线程;

2> 写锁的获取(WriteLock#lock())

3> 写锁的释放(WriteLock#unlock())

4> 读锁的获取(ReadLock#lock())

5> 读锁的释放(ReadLock#unlock())

另外:读写锁适用于读多写少的场景:

  • 读读可以并发,写锁与其他锁互斥;即:写写互斥、写读互斥、读读兼容。

源码中的应用?

  • Consumer并发消费模式下,RocketMQ清理过期消息(ProcessQueue#cleanExpiredMsg())时使用读写锁。(博主对RocketMQ比较熟悉,不熟悉的不建议给自己挖坑)
2)成员变量/构造函数
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    // 用于获取读锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
}

readerLock和writerLock用于支撑ReadWriteLock接口的读锁和写锁方法。通过构造方法得知,读写锁对象的创建和使用均依赖于公平锁或非公平锁同步器Sync。

public class ReentrantReadWriteLock
    implements ReadWriteLock, java.io.Serializable {

    // 读锁对象
    private final ReentrantReadWriteLock.ReadLock readerLock;

    // 写锁对象
    private final ReentrantReadWriteLock.WriteLock writerLock;

    // 同步器
    final Sync sync;
    
    // 默认是非公平锁
    public ReentrantReadWriteLock() {
        this(false);
    }

    // 根据fair变量选择创建公平锁还是非公平锁。true表示公平锁
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        // 用同步器来创建读写锁对象
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

}
1> 内部类Sync

核心变量和构造器:

abstract static class Sync extends AbstractQueuedSynchronizer {

  // 锁的状态位被分为两个部分:高16位表示读锁的个数、低16位表示写锁的个数。
  static final int SHARED_SHIFT   = 16;
  // 用于对高16位 *** 作: +1、-1
  static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
  // 最大读锁量
  static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
  // 用于获取低16位的值
  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

  // 获取高16位,即读锁的数量
  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
  // 获取低16位,即写锁的数量
  static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

  // 高16位为所有读锁拥有,如果我想获取每个线程对于读锁重入的次数,怎么做?
  //    1)采用ThreadLocal来进行统计,每个线程自己统计自己的。
  static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
  }

  // 继承自ThreadLocal,重写了其中的initialValue()方法,该方法将在线程第一次获取该变量时 初始化HoldCounter计数器。
  static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
      return new HoldCounter();
    }
  }

  // 创建ThreadLocal对象
  private transient ThreadLocalHoldCounter readHolds;

  // 缓存最后一个线程获取的读锁数量
  private transient HoldCounter cachedHoldCounter;

  // 获取读锁的第一个线程
  private transient Thread firstReader = null;
    
  // 保存读锁的第一个线程 获取到的读锁数量
  private transient int firstReaderHoldCount;

  Sync() {
    // 初始化ThreadLocalHoldCounter() ThreadLocal对象
    readHolds = new ThreadLocalHoldCounter();
    // 可见性保证,使用state变量的volatile语义
    setState(getState());
  }
    
    // 读锁是否应该阻塞
    abstract boolean readerShouldBlock();

    // 写锁是否应该阻塞
    abstract boolean writerShouldBlock();
}
1> 面试题5:为什么实例化Sync时要获取到state变量之后再setState()?

保证每个线程持有读锁数量readHolds的可见性。

  • 首先获取当前state值,然后再将获取到的state值赋值给state;

  • state变量是被volatile修饰的,对一个volatile变量的写 *** 作,会保证其前面普通变量的写 *** 作对volatile写 *** 作的可见性。

3> 面试题6:如何知道当前线程获取到了多少次读锁?

state变量的高16位表示读锁的总重入个数,使用ThreadLocalHoldCounter类型的变量readHolds来记录每个线程获取到了多少次共享锁;

  • ReentrantReadWriteLock的内部类ThreadLocalHoldCounter继承自ThreadLocal,确保初始化的时候就创建ThreadLocal而不是每次用的时候才创建;

我们可以通过getReadHoldCount()方法获取当前线程拥有的读锁个数。

此外:写锁是不需要ThreadLocal的,因为能获取到写锁的只有一个线程;直接取低16位即可。

4> 面试题7:假如全程只有一个线程获取读锁,还有必要使用ThreadLocal维护线程拥有的读锁数量吗?

ThreadLocal是占用内存的,如果全程只有一个线程获取读锁,是没有必要使用ThreadLocal的;所以读写锁对 获取线程拥有的读锁数量做了一个优化,如果当前线程是在并发获取读锁时第一个获取到读锁的线程,会采用一个volatile关键字修饰的int类型变量firstReaderHoldCount专门记录当前线程拥有的读锁数量;firstReader记录持有锁的线程;

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
4> getReadHoldCount()
final int getReadHoldCount() {
    if (getReadLockCount() == 0)
        return 0;

    Thread current = Thread.currentThread();
    if (firstReader == current)
        return firstReaderHoldCount;

    HoldCounter rh = cachedHoldCounter;
    if (rh != null && rh.tid == getThreadId(current))
        return rh.count;

    int count = readHolds.get().count;
    if (count == 0) readHolds.remove();
    return count;
}
3)写锁 1> 写锁的获取 WriteLock#lock()

直接通过内部组合的AQS的子类Sync,进入到AQS获取锁的模板方法acquire()

public void lock() {
    sync.acquire(1);
}

AQS的acquire()方法:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

ReentrantReadWriteLock的内部类Sync实现了tryAcquire()方法,自定义获取锁的逻辑,用来判断锁是否可以获取成功;主要逻辑如下:

  • 如果有线程获取到了读锁、或者当前线程不是持有互斥锁的线程,则获取写锁失败。
  • 如果当前线程已经获取到了写锁,则进行锁重入 *** 作。
  • 如果当前锁的数量为0,即AQS的state变量为0,那么当前线程可以尝试获取写锁,不过要进一步通过writerShouldBlock()方法判断当前写线程是否应该阻塞;
  • 不阻塞的话,获取到写锁,并且修改锁的状态,设置独占线程为当前线程。
protected final boolean tryAcquire(int acquires) {
  // 获取当前线程
  Thread current = Thread.currentThread();
  // 获取当前锁状态和互斥锁的数量
  int c = getState();
  int w = exclusiveCount(c);
  // 线程获取到了锁(读锁或写锁)
  if (c != 0) {
    // 有其他线程获取到了读锁 或 当前线程不是持有互斥锁的线程
    if (w == 0 // 有线程获取到了读锁
        || current != getExclusiveOwnerThread()) // 其他线程持有写锁
      // 加写锁失败
      return false;
    
    // 写锁重入,低16位直接加
    if (w + exclusiveCount(acquires) > MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    // Reentrant acquire
    setState(c + acquires);
    return true;
  }
  
  // 既没有写锁也没有读锁
  if (writerShouldBlock() || // 由子类判断当前线程是否应该获取写锁
      !compareAndSetState(c, c + acquires)) // 通过CAS抢写锁
    return false;
  // 当前线程抢写锁成功
  setExclusiveOwnerThread(current);
  return true;
}
1.1> writerShouldBlock()

在Sync#tryAcquire()方法中,最后会调用writerShouldBlock()方法判断当前线程是否可以获取到写锁;而Sync#writerShouldBlock()方法是一个抽象方法;

abstract boolean writerShouldBlock();

具体的实现在Sync的子类NonFairSync和FairSync中,因此有公平锁和非公平锁之分;

  • 非公平锁,直接返回FALSE,表示线程可以直接去获取写锁;

    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    
  • 公平锁,调用AQS的hasQueuedPredecessors()方法判断AQS的阻塞队列里是否有写线程在排队,如果有返回则TRUE。

    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    
2> 写锁的释放 WriteLock#unlock()

直接通过内部组合的AQS的子类Sync,进入到AQS获取锁的模板方法release()

public void unlock() {
    sync.release(1);
}

AQS的release()方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantReadWriteLock的内部类Sync实现了tryRelease()方法,自定义释放锁的逻辑,用来判断锁是否可以释放成功;主要逻辑如下:

  • 首先如果当前线程没有获取到写锁但想释放,则会直接抛出异常。
  • 否则更改状态(自减),因为是可重入锁,所以当锁的数量减少到0时,修改状态setState(0)。
protected final boolean tryRelease(int releases) {
    // 没有获取到写锁的线程,无法释放写锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    // 释放写锁之后,判断写锁状态是否为0(锁重入)
    boolean free = exclusiveCount(nextc) == 0;
    // 如果下一个状态值为0,表明当前线程完全释放了锁。
    if (free)
        // 将当前线程对象从OwnerThread中移除
        setExclusiveOwnerThread(null);
    // 全局设置state变量。
    setState(nextc);
    // 如果返回为true,由AQS完成后面线程的唤醒。
    return free;
}
4)读锁 1> 读锁的获取 ReadLock#lock()

直接通过内部组合的AQS的子类Sync,进入到AQS获取锁的模板方法acquireShared()

public void lock() {
    sync.acquire(1);
}

AQS的acquireShared()方法:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

ReentrantReadWriteLock的内部类Sync实现了tryAcquireShared()方法,自定义获取锁的逻辑,用来判断锁是否可以获取成功;主要逻辑如下:

直接通过AQS的模板方法acquireShared()方法实现,Sync类实现了AQS的tryAcquireShared()方法,自定义获取锁的逻辑,用来判断读锁是否可以获取成功:

  • 因为读写需要互斥,所以如果存在写锁并且持有写锁的线程不是当前线程,则当前线程获取读锁失败去排队。

  • 另外在读并发高的情况下,为了防止写锁饿死;

    • 非公平锁模式下,会判断AQS阻塞队列中的第一个线程是不是写线程;如果是,读锁就要继续等待,fullTryAcquireShared()方法做无限自旋尝试读锁 *** 作,因为写锁的优先级比较高
    • 公平锁模式下就先判断AQS阻塞队列中是非有写线程,有的话读锁就要继续等待,进入fullTryAcquireShared()方法做无限自旋尝试读锁 *** 作。
  • 接着使用CAS获取读锁,实际上就是将state变量的高16位自增。

  • 获取读锁成功后在ThreadLocal中记录当前线程获取到的读锁次数。

  • 而ThreadLocal是占用内存,出于内存占用优化的一个考量,当前线程是在并发获取读锁时第一个获取到读锁的线程,会采用一个volatile关键字修饰的int类型变量firstReaderHoldCount专门记录当前线程拥有的读锁数量;firstReader记录持有锁的线程;

    • 另外维护读锁信息时,它会采用头尾指针的思想,除了维护第一个获取到读锁的线程和读锁信息,还会使用cachedHoldCounter变量维护最后一个获取到读锁的线程的读锁信息。
  • 最后,JUC的作者有一个编程习惯,处于性能考虑,他喜欢将一些动作前置,如果不成功再进入无限自循环逻辑。像这里的会先在acquireShared()方法中尝试获取读锁,如果获取读锁失败再进入fullTryAcquireShared()方法无限自循环尝试获取读锁。

protected final int tryAcquireShared(int unused) {
 
  Thread current = Thread.currentThread();
  // 获取锁的状态
  int c = getState();
  if (exclusiveCount(c) != 0 && // 有线程持有写锁
      getExclusiveOwnerThread() != current) // 持有写锁的线程不是当前线程
    // 告诉AQS获取共享锁失败
    return -1;
  
  // 获取读锁的持有数量
  int r = sharedCount(c);
  if (!readerShouldBlock() && // 让子类判定读锁应不应该被阻塞,防止写锁饥饿
      r < MAX_COUNT && // 判断读锁是否发生溢出
      compareAndSetState(c, c + SHARED_UNIT)) { // CAS增加state的高16位的读锁持有数
    // 增加高16位之前的计数为0,表示当前线程是第一个获取读锁的线程、且第一次获取。
    if (r == 0) {
        // 注意:这里持有两个变量来优化ThreadLocal
      //设置读锁的第一个拥有者为当前线程
      firstReader = current;
      // 设置当前线程拥有读锁数量为1
      firstReaderHoldCount = 1;
     // 当前线程已经获取到的读锁
    } else if (firstReader == current) {
      // 读锁重入,直接++
      firstReaderHoldCount++;
    } else {
        // 当前线程不是第一个读线程,则将其获取读锁的次数保存在ThreadLocal中。
        // 看返回的是不是最后一个线程,
      HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
          // 不是最后一个就初始化一个ThreadLocal变量
        cachedHoldCounter = rh = readHolds.get();
      else if (rh.count == 0)
        readHolds.set(rh);
      rh.count++;
    }
    return 1;
  }
    // 上面的代码是对这一步的优化;这里才是真正获取读锁的地方。
  return fullTryAcquireShared(current);
}

JUC的作者道格.李写代码有一个习惯:会把一些常见的场景前置,保证性能;这可以看做是代码的一种优化。

1.1> fullTryAcquireShared()

在这里的表现为:把fullTryAcquireShared()方法中尝试获取锁的逻辑先提前到Sync#tryAcquireShared()方法中;

// 真正获取读锁的地方
	final int fullTryAcquireShared(Thread current) {

    // 用来保存当前线程获取的读锁数量
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 当前已经有线程获取到了写锁 且 当前获取写锁的线程不是当前线程。
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // 道格.李的代码中会把For循环里的一些条件挪到前面,进行前置优化。
            // 子类中判断 需要阻塞读锁的情况
        } else if (readerShouldBlock()) {
            // 如果当前线程是第一个获取到读锁的线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 当前线程不是第一个获取到读锁的线程,则移除ThreadLocal中的读锁信息
                if (rh == null) {
                    // 获取当前线程记录读锁重入次数的HoldCounter对象
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 当前读锁重入次数为0时,表明没有获取到读锁,此时返回-1,阻塞当前线程
                if (rh.count == 0)
                    return -1;
            }
        }
        // 读锁个数达到上限
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 此处和前面的代码一样,就是对读锁个数执行+1 *** 作
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
1.2> readerShouldBlock()

在Sync#tryAcquireShared()方法中,会调用readerShouldBlock()方法判断当前线程获取读锁应不应该被阻塞,进而防止写锁饥饿;而Sync#readerShouldBlock()方法是一个抽象方法;

abstract boolean readerShouldBlock();

具体的实现在Sync的子类NonFairSync和FairSync中,因此有公平锁和非公平锁之分;

  • 非公平锁,判断AQS阻塞队列的头结点的下一个节点是不是写线程,如果是,则返回TRUE,表示获取读锁需要阻塞;

    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
    

    AQS#apparentlyFirstQueuedIsExclusive():

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
    
  • 公平锁,调用AQS的hasQueuedPredecessors()方法判断AQS的阻塞队列里是否有其他线程在排队,如果有返回则TRUE。

    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
    
2> 读锁的释放 ReadLock#unlock()

直接通过内部组合的AQS的子类Sync,进入到AQS获取锁的模板方法releaseShared()

public void unlock() {
    sync.releaseShared(1);
}

AQS的releaseShared()方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

ReentrantReadWriteLock的内部类Sync实现了tryReleaseShared()方法,自定义释放锁的逻辑,用来判断锁是否可以释放成功;主要逻辑如下:

  • 从ThreadLocal中自减当前线程持有的读锁数量,如果减完读锁个数为0,remove掉ThreadLocal中保存的读锁信息;
  • 如果是第一个获取读锁的线程,则只针对firstReaderHoldCount变量和firstReader变量做自减和置空 *** 作;
  • 然后CAS修改state变量高16位(读锁个数),做自减1 *** 作;
  • 如果当前线程不再持有锁,唤醒AQS阻塞队列中的写线程。
protected final boolean tryReleaseShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 如果当前线程是第一个获取到读锁的线程
    if (firstReader == current) {
        // 只获取到了一次读锁,代表可以直接释放读锁
        if (firstReaderHoldCount == 1)
            // 将firstReader设置为null
            firstReader = null;
        else
            // 重入锁数 -- 
            firstReaderHoldCount--;
    } else {
        // 判断当前线程是不是最后一个获取到读锁的线程
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            // 如果不是,则获取当前线程持有的读锁个数
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 锁个数等于1,代表当前线程已经释放完读锁,则不再需要ThreadLocal中持有的HoldCounter对象
            readHolds.remove();
            // 如果读锁个数小于1,锁状态非法,抛出异常
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 重入锁个数-1
        --rh.count;
    }
    // 自旋尝试CAS对高16位(读锁) -1
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 如果当前线程不再持有锁,获选AQS队列中阻塞的写线程
            return nextc == 0;
    }
}
5)锁升级、锁降级 1>锁升级

读的时候写,即线程获取到了读锁,它又想获取写锁;

有两种情况:

  • 先释放读锁,再获取写锁(排队);
  • 不释放持有的读锁,直接获取写锁(非法状态)。

但是java不支持读的时候写(锁升级)–>在读锁释放之前,加写锁,再释放读锁。因为会出现数据不一致的问题。

2> 锁降级:

写的时候读,将锁降级为读锁;

有两种情况:

  • 先释放写锁,后获取读锁(可能会排队);
  • 不释放持有的写锁,直接获取读锁,然后再释放写锁。
lock.writeLock().lock();

lock.readLock().lock(); 

lock.writeLock().unlock();

lock.readLock().unlock(); 

锁降级的前提:读优先于写,并且所有线程都希望对数据变化敏感。

3> 锁降级的作用?
  • 保证数据的可见性,在一边读一边写的情况下提高性能。

  • 如果当前线程不获取读锁而直接释放写锁,假设此刻另一个线程T获取了写锁并修改了数据,阻塞了读锁的获取,那么获取读锁的线程获取到的数据有可能是不对的,因为写线程T又对数据做了更新 *** 作。

  • 如果这个线程获取了读锁,由于他遵循锁降级的步骤,线程T会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据的更新。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/738834.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存