前言
一、从一个代码示例开始二、Condition接口的实现分析
1、addConditionWaiter()方法的实现2、fullyRelease()方法的实现3、isonSyncQueue()方法的实现4、checkInterruptWhileWaiting()方法的实现5、await()方法的小总结6、signal方法的源码分析 总结
前言 在使用ReentrantLock锁的时候,我们通常使用Condition条件和其配合使用,从而实现线程间的通信。下面,从一个简单的线程交替执行的代码片段看一下Condition条件的实现原理。当然,Condition条件的实现也是通过AQS并发框架实现的,Condition的实现类ConditionObject是AQS的一个内部类,这篇文章的内容可以看作是之前的内容的补充。
一、从一个代码示例开始
先上一段代码,作用是实现A、B两个线程交替执行
public class ThreadMixOutTest2 { public static void main(String[] args) { Demo demo = new Demo(); for (int i = 0; i < 1000; i++) { new Thread(()->{ demo.incr(); },"A").start(); } for (int i = 0; i < 1000; i++) { new Thread(()->{ demo.decr(); },"B").start(); } } static class Demo{ //加减对象 private int a = 0; //锁 private Lock lock = new ReentrantLock(); //条件 private Condition condition = lock.newCondition(); public void incr(){ lock.lock(); try{ while(a != 0){ condition.await(); } System.out.println(Thread.currentThread().getName()+"线程"); a++; condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void decr(){ lock.lock(); try{ while(a == 0){ condition.await(); } System.out.println(Thread.currentThread().getName()+"线程"); a--; condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } }
在上面的代码中,我们使用Condition的await()和signalAll()方法实现线程A和线程B的通信,点进方法进入的是Condition接口,就从这两个方法的原理看起。
二、Condition接口的实现分析下面这段代码是AQS中对await()方法的具体实现。前面讲过AQS中定义了一个Node内部类实现节点用于存储线程,通过节点的入队出队实现线程对共享资源的请求。同理,AQS定义了一个ConditionObject内部类实现Condition接口,先看下这个内部类定义。这里我只把变量定义拿了过来,具体的方法在用到的时候再分析。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; //条件队列的首节点,transient用于避免序列化 private transient Node firstWaiter; //条件队列的尾节点 private transient Node lastWaiter; //无参构造 public ConditionObject() { } //模式意味着退出等待时重新中断 private static final int REINTERRUPT = 1; //模式意味着退出等待时抛出InterruptedException private static final int THROW_IE = -1; }
我们可以清楚了解到,AQS中除了Node节点组成的CLH队列(双向线程等待队列)之外,还有一个Node节点组成的条件等待队列。还记得之前说Node类中有一个Node nextWaiter;属性吗?条件队列就是用nextWaiter指针指向下一个节点的。这个条件队列是个单向的。好了,清楚了条件队列的构成之后,我们看一下await()方法的具体实现。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //将等待线程加入条件队列 Node node = addConditionWaiter(); //释放当前节点的所有重入锁 int savedState = fullyRelease(node); int interruptMode = 0; //如果当前节点不在同步队列,注意不是条件队列,是那个CLH while (!isOnSyncQueue(node)) { //将当前线程阻塞 LockSupport.park(this); //注意,当前线程被阻塞后,这里一下就不会执行了,直到被signal唤醒或者被中断 //检查线程被唤醒是否因为中断,如果是中断那么跳出循环 //如果是被其他线程signal唤醒的,那么也会在下次循环退出,因为被唤醒就代表节点进入CLH队列 //注意interruptMode==0说明没有中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //如果acquireQueued返回true,注意这里true代表获取锁的过程发生中断 //并且中断模式不是抛出异常 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //中断模式赋值为重新中断 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //节点不是等待状态,从条件队列去除 unlinkCancelledWaiters(); //这里的判断是因为跳出while循环有两种情况,中断或唤醒,这里要区分一下是不是唤醒 if (interruptMode != 0) //是因为中断,那么就重新中断当前线程 reportInterruptAfterWait(interruptMode); }1、addConditionWaiter()方法的实现
具体功能就是将当前线程封装成一个等待节点加入条件队列
private Node addConditionWaiter() { //获取条件队列尾节点 Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //尾节点不为null且尾节点不是等待状态 if (t != null && t.waitStatus != Node.CONDITION) { //将不是等待状态的节点断开链接(队列将其抛弃) unlinkCancelledWaiters(); //获取过滤后的新的尾节点 t = lastWaiter; } //新建一个等待状态且包含当前线程的节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); //条件队列实际是空的 if (t == null) //将当前节点设为头节点 firstWaiter = node; else //将原来的尾节点后连接上当前节点 t.nextWaiter = node; //新的尾节点指向当前节点 lastWaiter = node; //返回当前节点 return node; } //unlinkCancelledWaiters方法分析(将不是等待状态的节点去除) private void unlinkCancelledWaiters() { //条件队列头节点 Node t = firstWaiter; //是一个记录节点,用于记录头节点 Node trail = null; //头节点存在 while (t != null) { //头节点的下一个节点 Node next = t.nextWaiter; //如果头节点不是等待状态 if (t.waitStatus != Node.CONDITION) { //断开头节点 t.nextWaiter = null; //trail为空 if (trail == null) //将下一个节点赋值给头节点 firstWaiter = next; else //将头节点的下一个节点连接到是等待状态的节点上 trail.nextWaiter = next; //已经到达尾节点 if (next == null) lastWaiter = trail; } //如果头节点是等待状态,那么就要向后找 else //将头节点赋值给trail记录下来 trail = t; //头节点后移 t = next; } }2、fullyRelease()方法的实现
只有拥有锁的线程才能调用await()方法,因此该线程必然是同步队列的头节点。该方法的主要作用是将当前锁的重入次数全部释放,即将上一步加入条件队列的线程在同步队列中释放
final long fullyRelease(Node node) { //释放失败标志位 boolean failed = true; try { //获取state的值 long savedState = getState(); //释放成功,这里release实际调用tryRelease方法,tryRelease方法是通过子类实现的模板方法,在ReentrantLock中tryRelease方法若返回true,则state值置为0 if (release(savedState)) { failed = false; //返回重入数量,这里是0 return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }3、isonSyncQueue()方法的实现
主要是判断当前节点是否在同步队列中,在同步队列返回true,否则返回false
final boolean isOnSyncQueue(Node node) { //node节点的prev和next属性都是在CLH队列中使用 //当前节点处于等到状态或者当前节点不在同步队列 if (node.waitStatus == Node.ConDITION || node.prev == null) //返回false进入while循环 return false; //node节点存在下个节点,说明处于同步队列 if (node.next != null) // If has successor, it must be on queue return true; //从同步队列的尾节点开始向前遍历,找到返回true,否则为false return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { //同步队列的尾节点 Node t = tail; //自旋 for (;;) { //如果当前节点在同步队列中 if (t == node) return true; //尾节点为空,同步队列为空 if (t == null) return false; //向前遍历,对比当前节点 t = t.prev; } }4、checkInterruptWhileWaiting()方法的实现
这段代码的作用是检查等待线程是否被中断,外层用一个三目运算符判断是否中断,可以看到非中断是0;如果中断,再根据节点判断是抛出异常还是重新中断
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //这个方法用来判断在线程中断后,是抛出异常还是重新中断 //该方法的返回值代表当前线程是否在park的时候被中断唤醒,根据await的语义,在await时遇到中断要抛出InterruptedException final boolean transferAfterCancelledWait(Node node) { //若cas成功,说明中断发生时,没有signal调用,因为signal方法会将状态置为0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //将node加入CLH队列 enq(node); //返回true,表示中断发生在signal之前 return true; } //如果cas失败,检查node是否在CLH队列 while (!isOnSyncQueue(node)) //不在CLH队列,则先让其他线程执行 Thread.yield(); //知道当前node被signal方法添加到CLH队列,返回false return false; }5、await()方法的小总结
调用await方法,将当前线程封装成Node.ConDITION 类型的Node节点并添加到条件队列的尾部,释放当前线程获取的锁(注意这里是把重入次数全部释放),判断当前线程是否在同步队列中,不在的话使用park将线程挂起,退出while循环后,根据是唤醒跳出循环还是中断跳出循环具体分析后续的 *** 作。
上面我们对await方法的实现进行了源码分析,在小总结里遗留了一些对await方法跳出循环后处理的并没有详细说明,现在,通过对signal方法进行分析,对上面内容做一个补充。
6、signal方法的源码分析public final void signal() { //判断当前线程是否持有独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //条件队列头节点 Node first = firstWaiter; //头节点不是null if (first != null) //唤醒头节点 doSignal(first); } //唤醒头节点的doSignal方法 private void doSignal(Node first) { do { //将头节点的下个节点赋值给头节点,如果是null,说明条件队列只有一个节点 if ( (firstWaiter = first.nextWaiter) == null) //条件队列的尾节点置为空 lastWaiter = null; //将条件队列的头节点出队 first.nextWaiter = null; } //头节点转移到同步队列失败且条件队列不为空,循环继续 while (!transferForSignal(first) && (first = firstWaiter) != null); } //将节点从条件队列转移到同步队列,成功返回true;如果返回false,说明节点在被signal唤醒之前就取消 final boolean transferForSignal(Node node) { //如果不能改变节点状态值,说明该节点已经不是等待状态 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //调用enq方法将节点加入同步队列队尾,注意这里返回的是同步队列的倒数第二个节点 Node p = enq(node); //获取p节点的等待状态 int ws = p.waitStatus; //ws>0说明p节点处于canceled状态 //cas失败说明唤醒p节点失败 //如果是这两种情况,那么就把当前节点线程唤醒,这样做是因为在同步队列中,如果当前节点的前一个节点处于取消状态或者无法被唤醒,那么其后的节点也无法被唤醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } //再来看一下doSignalAll方法,Removes and transfers all nodes,将所有节点从条件队列转移到同步队列,和doSignal方法的区别在while循环中,doSignal遇到一个节点转移成功就停止,doSignalAll是直到条件队列为空 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
signal方法和signalAll方法的区别在于循环的结束条件上,signal方法遇到转移成功的节点就停止,而signalAll方法需要条件队列为空才停止。下面,我们再看一下await方法线程阻塞后的代码
int interruptMode = 0; while (!isOnSyncQueue(node)) { //线程挂起了 LockSupport.park(this); //判断是被唤醒还是被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //acquireQueued返回true代表获取锁的过程中发生中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
在线程被挂起后,后续的 *** 作就不再执行,除非被唤醒或者被中断,这里就要具体分析一下:
如果interruptMode ==0,说明是被唤醒,这时通过while循环判断当前线程节点进入了同步队列,结束循环,进入第一个if判断,如果acquireQueued返回true,由于此时interruptMode ==0,所以,interruptMode会被赋值为REINTERRUPT,最终线程会进行自我中断。如果interruptMode !=0,说明发生中断,分为两种情况,一种是signal之前中断,此时interruptMode==THROW_IE,那么就会抛出线程中断异常,另一种是在signal之后中断,此时interruptMode == REINTERRUPT,那么就会执行线程的自我中断。 总结
进入await方法时线程一定持有锁,离开await方法时同样是持有了锁调用await方法时会是当前线程被封装成Node节点加入条件队列队尾,然后释放持有的锁,释放锁后,当前线程在条件队列挂起,等待signal或者中断线程被唤醒后会离开条件队列进入同步队列中竞争锁,若竞争到锁之前发生中断,则根据具体的中断模式判断是抛出异常还是自我中断线程在竞争到锁之后进行一系列后续 *** 作,包括离开条件队列,处理中断异常中断或signal都是将线程从条件队列移除,加入同步队列去竞争锁,不同的是,signal是正常唤醒,中断是非正常唤醒。如果中断发生在signal之前,则在最终返回时需要抛出异常;如果中断发生在signal之后,线程实际已被唤醒,我们可以忽略这个中断,在await方法结束时自我中断一下Condition的原理是ConditionObject实现类内部维护一个条件队列(单向),在获取锁的情况下线程调用await方法,线程会进入条件队列并被阻塞。直到其他线程调用signal/signalAll方法唤醒,线程被唤醒后进入同步队列,参与竞争锁
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)