深入理解ReentrantWriteReadLock源码及获取锁和释放锁的过程

深入理解ReentrantWriteReadLock源码及获取锁和释放锁的过程,第1张

深入理解ReentrantWriteReadLock源码及获取锁和释放锁的过程 ReadLock 获取锁

首先来看获取锁的核心源码

protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            //获取锁的状态
            //利用高16位表示读锁,低16位表示写锁
            int c = getState();
            //exclusiveCount(c) 这个方法就是获取低16位的值
            //如果不等于零,说明有写锁,再去判断持有锁的线程是不是当前线程
            //如果不是当前线程,则返回-1,调用doAcquireShared(arg)去入队
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //上面的if条件没走,有两种情况
            //第一种情况:没有写锁
            //第二种情况:有写锁,但是是持有写锁的线程去获取读锁,这就是锁降级
            // sharedCount(c) 该方法是获取读锁的状态,即获取读锁的次数(重入的也算)
            //即 r >=持有读锁线程的数量
            int r = sharedCount(c);
            // readerShouldBlock() 该方法体现出非公平读锁和公平读锁的区别
            //如果是公平锁,则该方法的执行逻辑是
            //如果等待队列里面有线程在等待并且不是当前线程,就返回true
            //如果是非公平锁
            //如果等待队列的头结点不为空,头结点的next节点也不为空,并且next
            //节点所等待的线程是要获取独占锁(写锁),而且next节点存的线程不为空
            //则当前线程去入队等待
            //至于为什么要是独占锁(写锁)才入队等待, 我认为是为了避免写锁饥饿
            //其他情况都是不用入队直接抢,抢不到再去入队
            //后面两个条件都很好理解
            //如果下面这个if判断整体为false
            //则会去执行fullTryAcquireShared(current)
            //这个方法到后面会详细解释
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // r=0,说明还没有线程持有读锁
                if (r == 0) {
                	//把当前线程设为第一个持有读锁的线程
                    firstReader = current;
                    firstReaderHoldCount = 1;
                //锁的重入
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                	//获取缓存的HoldCounter
                    HoldCounter rh = cachedHoldCounter;
                    //如果当前的HoldCounter 为空,
                    //或者和当前线程的HoldCounter 不一样
                    //则把当前线程的HoldCounter 赋给rh
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    //如果当前线程的HoldCounter 中存的count为0,
                    //说明该线程是第一次获取读锁,则把HoldCounter 存到当前
                    //线程的ThreadLocal中,和当前线程绑定
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //获取锁的次数加1
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

解析fullTryAcquireShared(current)的作用:
先看源码:

  
final int fullTryAcquireShared(Thread current) {
          
            HoldCounter rh = null;
            //for循环保证当前线程能够入队或者获取到读锁
            for (;;) {
            //继续往下分析,可以看出来,下面的代码和tryAcquireShared(int unused)
            //几乎没有区别,整个方法的作用就是返回1或者-1,保证当前线程能够入队等待或者获取到读锁
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
ReadLock 释放锁

首先来看释放锁的核心源码

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //判断当前线程是不是第一个获取到读锁的线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                 //如果当前的HoldCounter 为空,
                 //或者和当前线程的HoldCounter 不一样
                 //则把当前线程的HoldCounter 赋给rh
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                //如果count=1,当前线程会真正的失去这把锁
           
                if (count <= 1) {
                	//把ThreadLocal中存的HoldCounter 移除避免内存泄漏
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            //for循环去CAS读锁的状态,直到修改成功,
            //如果nextc=0,说明所有持有读锁的线程都已经把读锁释放了
            //如果nextc != 0,说明还存在线程持有读锁
            //返回true 就会执行doReleaseShared()方法
            //该方法主要是去唤醒同步等待队列中的线程
            //下面会详细讲解该方法
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

所有线程的读锁被释放后会去唤醒等待队列里的线程

private void doReleaseShared() {
	//这里的唤醒过程和ReentrantLock没有区别,用的同一个方法
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           //ws==-1 说明有节点(线程)需要被唤醒
           if (ws == Node.SIGNAL) {
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
               unparkSuccessor(h);
           }
           else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;                // loop on failed CAS
       }
       if (h == head)                   // loop if head changed
           break;
   }
    }

注意:这里是重点,如果不仔细理解,很难弄明白读锁的传播(读读共享)
之前在获取读锁的过程讲到,获取读锁失败的线程会被包装成一个node节点入队到同步等待队列,那么我们就来看看入队的过程
核心源码如下

private void doAcquireShared(int arg) {
		//addWaiter(Node.SHARED),
		//该方法会把当前线程包装成一个node节点,并把该节点入队
		//读锁节点,后续会根据是读写节点还是写锁节点来判断唤醒这个 *** 作是否继续往下传播
		//注意
		//注意
		//如果你是来入队的,则可以按照顺序来分析源码
		//如果你被唤醒的节点,你需要从线程阻塞的地方开始分析
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                	//被唤醒后尝试去获取锁
                    int r = tryAcquireShared(arg);
                    //大于等于0说明获取到了读锁
                    //有人会有疑问,为什么获取到的一定是读锁呢,
                    //因为只有被唤醒的是需要读锁的线程,才会来到这个方法
                    //如果被唤醒的线程是需要写锁的,会在另外一个方法中
                    if (r >= 0) {
                    	//这里就是体现读读共享和读锁的传播属性的地方
                    	//简单讲一下setHeadAndPropagate(node, r)的核心逻辑
                    	//该方法会去判断下一个节点是不是共享节点(读锁节点),
                    	//如果是就调用doAcquireShared(int arg)(就是本方法)
                    	//说白了就是递归调用,直到遇到写锁节点就返回(读写互斥),停止递归调用,
                    	//即停止读锁的传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //线程(节点)阻塞的地方,也是线程被唤醒后开始执行的地方
                //把当前线程阻塞前,会把上一个节点的waitStatus改为-1,
                //这是告诉上一个节点,你出队的时候记得把我唤醒
                //线程被唤醒后会再一次执行for循环
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
WriteLock 获取锁

相信看懂ReadLock的源码之后,就会发现WriteLock的源码是非常简单的

protected final boolean tryAcquire(int acquires) {
            
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                //c!=0且w==0,说明,无写锁,有读锁,读写互斥,所以直接去同步等待队列入队
                //w != 0,说明存在写锁,但是不是持有写锁的线程,因此也是直接去入队(写写互斥)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                //写锁重入(有写锁,且持有写锁的线程是当前线程)
                setState(c + acquires);
                return true;
            }
            //writerShouldBlock()就是体现公平和非公平的区别
            //公平:同步等待中有线程等待且不是当前线程,则入队等待
            //非公平:先抢再说,抢不到我在入队等待 
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //走到这一步说明,抢到了写锁并且还是第一次持有写锁
            setExclusiveOwnerThread(current);
            return true;
        }

再来看看写锁入队的源码:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

private Node addWaiter(Node mode) {
		//addWaiter(Node.EXCLUSIVE),声明一个独占锁(写锁)的节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //pred为空,说明同步等待队列还没有初始化
        if (pred != null) {
        	//主要的工作就是将node节点添加到同步等待队列的尾部
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //初始化同步等待队列,这里不细讲,就是一个双向链表的初始化
        enq(node);
        return node;
    }

acquireQueued(node, arg))这个方法的主要作用是写锁阻塞和被唤醒去抢锁
下面来看它的源码

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //查看当前线程节点前面的节点是不是头结点
                //如果是,说明该当前线程去抢锁了(tryAcquire(arg))
                
                if (p == head && tryAcquire(arg)) {
                //抢到就把同步等待队列的头结点设为持有当前线程的节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //抢锁失败,就阻塞,不过阻塞之前要把持有当前线程的节点的前面那个节点的,
                //waitStatus设为-1,告诉前面的那个节点,你执行了之后要唤醒我
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
WriteLock 释放锁

直接看源码

public final boolean release(int arg) {
        if (tryRelease(arg)) {
        	//释放成功就唤醒头结点的下一个节点
        	//因为头结点代表当前线程(这是同步等待队列的一个设计的精妙之处)
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            //free 为true,说明写锁完全释放
            //为false 说明重入,返回false 还不能释放
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

结束语:在我看来,彻底理解了ReentrantWriteReadLock的获取锁和释放锁的逻辑之后,再去学习
ReentrantLock,Semaphore(信号量),CountDownLatch(发令q),CyclicBarrier(循环栅栏)会非常容易,
因为ReentrantWriteReadLock包含了独占锁的加锁解锁逻辑和共享锁的加锁解锁逻辑,这些并发控制工具类无非就是在这些的基础上进行修改,以满足相应的需求.如果ReentrantWriteReadLock理解有困难的话,建议先去看看ReentrantLock的源码,它是一个独占锁,不是特别复杂.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5686248.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存