并发编程Condition的原理及源码解读

并发编程Condition的原理及源码解读,第1张

并发编程工具 condition条件控制

​ 学习 synchronized 的时候,有 wait/notify 的基本使用,结合 synchronized 可以实现对线程的通信。那么这个时候我就在思考了,既然 J.U.C 里面提供了锁的实现机制,那 J.U.C 里面有没有提供类似的线程通信的工具呢?于是找阿找,发现了一个 Condition 工具类。Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

Condition 的基本使用

以生产者和消费者为例子:

Producer:

public class Producer implements Runnable{

    private Queue<String> msg;

    private int maxSize;

    Lock lock;
    Condition condition;

    public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock();
                while(msg.size()==maxSize){
                    System.out.println("生产者队列满了,先等待");
                    try {
                        condition.await(); //阻塞线程并释放锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("生产消息:"+i);
                msg.add("生产者的消息内容"+i);
                condition.signal(); //唤醒阻塞状态下的线程
            lock.unlock();
        }
    }
}

Consumer:

public class Producer implements Runnable{

    private Queue<String> msg;

    private int maxSize;

    Lock lock;
    Condition condition;

    public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock();
                while(msg.size()==maxSize){
                    System.out.println("生产者队列满了,先等待");
                    try {
                        condition.await(); //阻塞线程并释放锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("生产消息:"+i);
                msg.add("生产者的消息内容"+i);
                condition.signal(); //唤醒阻塞状态下的线程
            lock.unlock();
        }
    }
}

调用类

public class App 
{
    public static void main( String[] args )
    {
        Queue<String> queue=new LinkedList<>();
        Lock lock=new ReentrantLock(); //重入锁
        Condition condition=lock.newCondition();
        int maxSize=5;

        Producer producer=new Producer(queue,maxSize,lock,condition);
        Consumer consumer=new Consumer(queue,maxSize,lock,condition);

        Thread t1=new Thread(producer);
        Thread t2=new Thread(consumer);
        t1.start();
        t2.start();

    }
}

总结:生产者通过lock.lock()加锁,等消息队列满了后执行condition.await()阻塞线程并释放锁,然后通过condition.signal消费者的进行消费…

condition原理 codition.await

看看源码

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
     //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是表
            Node node = addConditionWaiter();
     //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
            int savedState = fullyRelease(node);
     //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {{//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
                LockSupport.park(this);//通过 park 挂起当前线程
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
     // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
     // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
     // 将这个变量设置成 REINTERRUPT.
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
     // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点. 
     // 如果是 null ,就没有什么好清理的了.
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
        // 如果线程被中断了,需要抛出异常.或者什么都不做
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
addConditionWaiter

这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表.

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
    // 如 果 lastWaiter 不 等 于 空 并 且waitStatus 不等于 CONDITION时,把冲好这个节点从链表中移除
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
    //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比 AQS 来说会简单很多
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

执行完 addConditionWaiter 这个方法之后,就会产生一个这样的 condition 队列

fullyRelease

​ fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //获得重入的次数
            int savedState = getState();
            //释放锁并且唤醒下一个同步队列中的线程 release(savedState)
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
isOnSyncQueue

判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用 signal 唤醒,如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限,为什么要做这个判断呢?原因是在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把当前节点从 condition 队列转移到 AQS 队列

➢ 大家思考一下,基于现在的逻辑结构。如何去判断

ThreadA 这个节点是否存在于 AQS 队列中呢?

  1. 如果 ThreadA 的 waitStatus 的状态为 CONDITION,说

明它存在于 condition 队列中,不在 AQS 队列。因为

AQS 队列的状态一定不可能有 CONDITION

  1. 如果 node.prev 为空,说明也不存在于 AQS 队列,原因

是 prev=null 在 AQS 队列中只有一种可能性,就是它是

head 节点,head 节点意味着它是获得锁的节点。

  1. 如果 node.next 不等于空,说明一定存在于 AQS 队列

中,因为只有 AQS 队列才会存在 next 和 prev 的关系

  1. findNodeFromTail,表示从 tail 节点往前扫描 AQS 队列,一旦发现 AQS 队列的节点和当前节点相等,说明节点一定存在于 AQS 队列中
final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null)
            return true;
        return findNodeFromTail(node);
    }
Condition.signal

await 方法会阻塞 ThreadA,然后 ThreadB 抢占到了锁获得了执行权限,这个时候在 ThreadB 中调用了 Condition的 signal()方法,将会唤醒在等待队列中节点

doSignal
private void doSignal(Node first) {
    do {
        //从 Condition 队列中删除 first 节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null; // 将 next 节点设置成 null
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
transferForSignal

该方法先是 CAS 修改了节点状态,如果成功,就将这个节点放到 AQS 队列中,然后唤醒这个节点上的线程。此时,那个节点就会在 await 方法中苏醒

final boolean transferForSignal(Node node) {
       
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
            return false;
        Node p = enq(node);//调用 enq,把当前节点添加到AQS 队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
        int ws = p.waitStatus;
    // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞),
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);// 唤醒节点上的线程
        return true; //如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
    }

图解分析

执行完 doSignal 以后,会把 condition 队列中的节点转移到 aqs 队列上,逻辑结构图如下这个时候会判断 ThreadA 的 prev 节点也就是 head 节点的 waitStatus,如果大于 0 或者设置 SIGNAL 失败,表示节点被设置成了 CANCELLED 状态。这个时候会唤醒ThreadA 这个线程。否则就基于 AQS 队列的机制来唤醒,也就是等到 ThreadB 释放锁之后来唤醒 ThreadA

被阻塞的线程唤醒后的逻辑

前面在分析 await 方法时,线程会被阻塞。而通过 signal被唤醒之后又继续回到上次执行的逻辑中标注为红色部分

的代码checkInterruptWhileWaiting 这个方法是干嘛呢?其实从名字就可以看出来,就是 ThreadA 在 condition 队列被阻塞的过程中,有没有被其他线程触发过中断请求

checkInterruptWhileWaiting

​ 如果当前线程被中断,则调用transferAfterCancelledWait 方法判断后续的处理应该是抛出 InterruptedException 还是重新中断。这里需要注意的地方是,如果第一次 CAS 失败了,则不能判断当前线程是先进行了中断还是先进行了 signal 方法的调用,可能是先执行了 signal 然后中断,也可能是先执行了中断,后执行了 signal,当然,这两个 *** 作肯定是发生在 CAS 之前。这时需要做的就是等待当前线程的 node被添加到 AQS 队列后,也就是 enq 方法返回后,返回false 告诉 checkInterruptWhileWaiting 方法返回REINTERRUPT(1),后续进行重新中断。

​ 简单来说,该方法的返回值代表当前线程是否在 park 的时候被中断唤醒,如果为 true 表示中断在 signal 调用之前,signal 还未执行,那么这个时候会根据 await 的语义,在 await 时遇到中断需要抛出interruptedException,返回 true 就是告诉checkInterruptWhileWaiting 返回THROW_IE(-1)。如果返回 false,否则表示 signal 已经执行过了,只需要重新响应中断即可.

        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
final boolean transferAfterCancelledWait(Node node) {
    //使用 cas 修改节点状态,如果还能修改成功,说明线程被中断时,signal 还没有被调用。
// 这里有一个知识点,就是线程被唤醒,并不一定是在 java 层面执行了locksupport.unpark,也可能是调用了线程的 interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node); //如果 cas 成功,则把node 添加到 AQS 队列
            return true;
        }
    //如果 cas 失败,则判断当前 node 是否已经在 AQS 队列上,如果不在,则让给其他线程执行
//当 node 被触发了 signal 方法时,node 就会被加到 aqs 队列上
        while (!isOnSyncQueue(node))//循环检测 node 是否已经成功添加到 AQS 队列中
            Thread.yield();//如果没有,则通过 yield,
        return false;
    }

acquireQueued

这个方法在讲 aqs 的时候说过,是的当前被唤醒的节点ThreadA 去抢占同步锁。并且要恢复到原本的重入次数状态。调用完这个方法之后,AQS 队列的状态如下将 head 节点的 waitStatus 设置为-1,Signal 状态

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ISRN9n1a-1652511876217)(D:/资料/java资料/笔记/msg/image-20220514113338649.png)]

reportInterruptAfterWait

根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报。

如果是 THROW_IE,则抛出中断异常

如果是 REINTERRUPT,则重新响应中断

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
Condition 总结

await 和 signal 的总结

我把前面的整个分解的图再通过一张整体的结构图来表述,线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续 *** 作。如果 awaitThread 获取 lock 失败会直接进入到同步队列

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/923406.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存