- (1)基础结构
- (2)构造函数
- (3)countDown
- (4)await
上面展示了CountDownLatch 的类结构,其中有一个静态内部类Sync。
这个类正是继承自 AQS,并且重写了tryAcquireShared、tryReleaseShared方法。在 Sync 类里,根据是否是独占,来重写对应的方法。如果是独占,则重写 tryAcquire 或 tryRelease 等方法;如果是非独占,则重写 tryAcquireShared 和 tryReleaseShared 等方法”。那么从这里可以看出CountDownLatch 属于非独占的类型。
首先我们需要知道CountDownLatch的count在使用上有什么意义,它是 JDK 提供的并发流程控制的工具类,它是在 java.util.concurrent 包下,在 JDK1.5 以后加入的。 CountDownLatch 的核心思想,等到一个设定的数值达到之后,才能出发。CountDownLatch允许一个或多个线程等待其他线程完成 *** 作。
继续往下看,其中调用了父类也就是AbstractQueuedSynchronizer的setState方法,所以我们通过 CountDownLatch 构造函数将传入的 count 最终传递到 AQS 内部的 state 变量,给 state 赋值,state 就代表还需要倒数的次数。
在 countDown 方法中调用的是 sync 的 releaseShared 方法:
public final boolean releaseShared(int arg) { //尝试以共享模式去释放锁 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
接下来在看下tryReleaseShared方法
protected boolean tryReleaseShared(int releases) { //不断循环检查 *** 作状态 for (;;) { int c = getState(); //如果为0则说明没有未持有资源,则不能释放 if (c == 0) return false; // 执行自减 *** 作,直到state为0 int nextc = c-1; //CAS更新state值 if (compareAndSetState(c, nextc)) return nextc == 0; } }
【1】方法体内是一个for的死循环,在循环体中,最开始是通过 getState 拿到当前 state 的值并赋值给变量 c,如果为0则说明没有未持有资源,则不能释放,直接返回 false,相当于 releaseShared 方法不产生效果,也就意味着 countDown 方法不产生效果。
【2】如果 c 不等于 0,在这里会先把 c-1 的值赋给 nextc,然后再利用 CAS 尝试把 nextc 赋值到 state 上。如果赋值成功就代表本次 countDown 方法 *** 作成功,也就意味着把 AQS 内部的 state 值减了 1。最后,是 return nextc == 0,如果 nextc 为 0,意味着本次倒数后恰好达到了规定的倒数次数,门闩应当在此时打开,所以 tryReleaseShared 方法会返回 true,那么再回到之前的 releaseShared 方法中,可以看到,接下来会调用 doReleaseShared 方法,效果是对之前阻塞的线程进行唤醒,让它们继续执行。
【3】注意最后一行代码return nextc == 0;这意味着只有在当前CountDownLatch 的state的计数器为0时才会返回true,一旦 tryReleaseShared 方法 return true,则 releaseShared 方法会调用 doReleaseShared 方法,把所有之前阻塞的线程都唤醒。这就是意味着在在执行countDown的时候如果此时state不为0的话,那么线程都会阻塞
(4)await该方法是 CountDownLatch 的“获取”方法,调用 await 方法会把线程阻塞,直到倒数为 0 才能继续执行。await 方法和 countDown 是配对的
1:可响应中断
2:非独占模式
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
以上代码一方面响应了对于中断的处理,另一方面调用了 tryAcquireShared 方法。这个方法很简单,它会直接判断 getState 的值是不是等于 0,如果等于 0 就返回 1,不等于 0 则返回 -1。
getState 方法获取到的值是剩余需要倒数的次数,如果此时剩余倒数的次数大于 0,那么 getState 的返回值自然不等于 0,因此 tryAcquireShared 方法会返回 -1,一旦返回 -1,再看到 if (tryAcquireShared(arg) < 0) 语句中,就会符合 if 的判断条件,并且去执行 doAcquireSharedInterruptibly 方法,然后会让线程进入阻塞状态。
private void doAcquireSharedInterruptibly(int arg) //doAcquireSharedInterruptibly会将线程阻塞放入链表队列中 throws InterruptedException { //将当前线程放入到等待队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //自旋 for (;;) { //获取当前节点的上一个节点 final Node p = node.predecessor(); //如果该节点为头结点,则尝试再次获取锁 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //获取锁失败后判断是否应该挂起该线程 if (shouldParkAfterFailedAcquire(p, node) && //挂起线程,即处于等待状态 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)