首先来看获取锁的核心源码
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //获取锁的状态 //利用高16位表示读锁,低16位表示写锁 int c = getState(); //exclusiveCount(c) 这个方法就是获取低16位的值 //如果不等于零,说明有写锁,再去判断持有锁的线程是不是当前线程 //如果不是当前线程,则返回-1,调用doAcquireShared(arg)去入队 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //上面的if条件没走,有两种情况 //第一种情况:没有写锁 //第二种情况:有写锁,但是是持有写锁的线程去获取读锁,这就是锁降级 // sharedCount(c) 该方法是获取读锁的状态,即获取读锁的次数(重入的也算) //即 r >=持有读锁线程的数量 int r = sharedCount(c); // readerShouldBlock() 该方法体现出非公平读锁和公平读锁的区别 //如果是公平锁,则该方法的执行逻辑是 //如果等待队列里面有线程在等待并且不是当前线程,就返回true //如果是非公平锁 //如果等待队列的头结点不为空,头结点的next节点也不为空,并且next //节点所等待的线程是要获取独占锁(写锁),而且next节点存的线程不为空 //则当前线程去入队等待 //至于为什么要是独占锁(写锁)才入队等待, 我认为是为了避免写锁饥饿 //其他情况都是不用入队直接抢,抢不到再去入队 //后面两个条件都很好理解 //如果下面这个if判断整体为false //则会去执行fullTryAcquireShared(current) //这个方法到后面会详细解释 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // r=0,说明还没有线程持有读锁 if (r == 0) { //把当前线程设为第一个持有读锁的线程 firstReader = current; firstReaderHoldCount = 1; //锁的重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { //获取缓存的HoldCounter HoldCounter rh = cachedHoldCounter; //如果当前的HoldCounter 为空, //或者和当前线程的HoldCounter 不一样 //则把当前线程的HoldCounter 赋给rh if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //如果当前线程的HoldCounter 中存的count为0, //说明该线程是第一次获取读锁,则把HoldCounter 存到当前 //线程的ThreadLocal中,和当前线程绑定 else if (rh.count == 0) readHolds.set(rh); //获取锁的次数加1 rh.count++; } return 1; } return fullTryAcquireShared(current); }
解析fullTryAcquireShared(current)的作用:
先看源码:
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //for循环保证当前线程能够入队或者获取到读锁 for (;;) { //继续往下分析,可以看出来,下面的代码和tryAcquireShared(int unused) //几乎没有区别,整个方法的作用就是返回1或者-1,保证当前线程能够入队等待或者获取到读锁 int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }ReadLock 释放锁
首先来看释放锁的核心源码
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //判断当前线程是不是第一个获取到读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; //如果当前的HoldCounter 为空, //或者和当前线程的HoldCounter 不一样 //则把当前线程的HoldCounter 赋给rh if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; //如果count=1,当前线程会真正的失去这把锁 if (count <= 1) { //把ThreadLocal中存的HoldCounter 移除避免内存泄漏 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //for循环去CAS读锁的状态,直到修改成功, //如果nextc=0,说明所有持有读锁的线程都已经把读锁释放了 //如果nextc != 0,说明还存在线程持有读锁 //返回true 就会执行doReleaseShared()方法 //该方法主要是去唤醒同步等待队列中的线程 //下面会详细讲解该方法 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
所有线程的读锁被释放后会去唤醒等待队列里的线程
private void doReleaseShared() { //这里的唤醒过程和ReentrantLock没有区别,用的同一个方法 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //ws==-1 说明有节点(线程)需要被唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
注意:这里是重点,如果不仔细理解,很难弄明白读锁的传播(读读共享)
之前在获取读锁的过程讲到,获取读锁失败的线程会被包装成一个node节点入队到同步等待队列,那么我们就来看看入队的过程
核心源码如下
private void doAcquireShared(int arg) { //addWaiter(Node.SHARED), //该方法会把当前线程包装成一个node节点,并把该节点入队 //读锁节点,后续会根据是读写节点还是写锁节点来判断唤醒这个 *** 作是否继续往下传播 //注意 //注意 //如果你是来入队的,则可以按照顺序来分析源码 //如果你被唤醒的节点,你需要从线程阻塞的地方开始分析 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //被唤醒后尝试去获取锁 int r = tryAcquireShared(arg); //大于等于0说明获取到了读锁 //有人会有疑问,为什么获取到的一定是读锁呢, //因为只有被唤醒的是需要读锁的线程,才会来到这个方法 //如果被唤醒的线程是需要写锁的,会在另外一个方法中 if (r >= 0) { //这里就是体现读读共享和读锁的传播属性的地方 //简单讲一下setHeadAndPropagate(node, r)的核心逻辑 //该方法会去判断下一个节点是不是共享节点(读锁节点), //如果是就调用doAcquireShared(int arg)(就是本方法) //说白了就是递归调用,直到遇到写锁节点就返回(读写互斥),停止递归调用, //即停止读锁的传播 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //线程(节点)阻塞的地方,也是线程被唤醒后开始执行的地方 //把当前线程阻塞前,会把上一个节点的waitStatus改为-1, //这是告诉上一个节点,你出队的时候记得把我唤醒 //线程被唤醒后会再一次执行for循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }WriteLock 获取锁
相信看懂ReadLock的源码之后,就会发现WriteLock的源码是非常简单的
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //c!=0且w==0,说明,无写锁,有读锁,读写互斥,所以直接去同步等待队列入队 //w != 0,说明存在写锁,但是不是持有写锁的线程,因此也是直接去入队(写写互斥) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire //写锁重入(有写锁,且持有写锁的线程是当前线程) setState(c + acquires); return true; } //writerShouldBlock()就是体现公平和非公平的区别 //公平:同步等待中有线程等待且不是当前线程,则入队等待 //非公平:先抢再说,抢不到我在入队等待 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //走到这一步说明,抢到了写锁并且还是第一次持有写锁 setExclusiveOwnerThread(current); return true; }
再来看看写锁入队的源码:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
private Node addWaiter(Node mode) { //addWaiter(Node.EXCLUSIVE),声明一个独占锁(写锁)的节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //pred为空,说明同步等待队列还没有初始化 if (pred != null) { //主要的工作就是将node节点添加到同步等待队列的尾部 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //初始化同步等待队列,这里不细讲,就是一个双向链表的初始化 enq(node); return node; }
acquireQueued(node, arg))这个方法的主要作用是写锁阻塞和被唤醒去抢锁
下面来看它的源码
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //查看当前线程节点前面的节点是不是头结点 //如果是,说明该当前线程去抢锁了(tryAcquire(arg)) if (p == head && tryAcquire(arg)) { //抢到就把同步等待队列的头结点设为持有当前线程的节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //抢锁失败,就阻塞,不过阻塞之前要把持有当前线程的节点的前面那个节点的, //waitStatus设为-1,告诉前面的那个节点,你执行了之后要唤醒我 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }WriteLock 释放锁
直接看源码
public final boolean release(int arg) { if (tryRelease(arg)) { //释放成功就唤醒头结点的下一个节点 //因为头结点代表当前线程(这是同步等待队列的一个设计的精妙之处) Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //free 为true,说明写锁完全释放 //为false 说明重入,返回false 还不能释放 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
结束语:在我看来,彻底理解了ReentrantWriteReadLock的获取锁和释放锁的逻辑之后,再去学习
ReentrantLock,Semaphore(信号量),CountDownLatch(发令q),CyclicBarrier(循环栅栏)会非常容易,
因为ReentrantWriteReadLock包含了独占锁的加锁解锁逻辑和共享锁的加锁解锁逻辑,这些并发控制工具类无非就是在这些的基础上进行修改,以满足相应的需求.如果ReentrantWriteReadLock理解有困难的话,建议先去看看ReentrantLock的源码,它是一个独占锁,不是特别复杂.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)