java多线程 wait、notify详解 保护性暂停模式和生产者消费者模式

java多线程 wait、notify详解 保护性暂停模式和生产者消费者模式,第1张

java多线程 wait、notify详解 保护性暂停模式和生产者消费者模式

目录

概述

API介绍

基本使用

sleep和wait的区别

正确姿势

同步模式

保护性暂停设计模式

增加超时效果

join的原理

多对保护性暂停

异步模式

生产者消费者


概述

先来简单回顾一下管程Monitor,如上图Monitor的Owner是现在锁的持有者;EntryList是那些争抢锁但是没有争抢到的一些阻塞了的线程,线程的状态为BLOCKED;WaitSet里存放的则是曾经获得过锁,曾经是Monitor的Onwer,但是运行的时候条件不满足,只能先放弃锁的线程,WaitSet里面的线程的状态是WAITING或TIMED_WAITING状态(在 *** 作系统层面这两种也是阻塞的)。

Owner线程发现条件不满足,调用wait方法,即可进入WaitSet变为WAITING状态BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片BLOCKED线程会在Owner释放锁时唤醒WAITNG线程会在Owner线程调用notify或notifyAll时唤醒,但唤醒后并不意味者立刻获得锁仍需进入EntryList重新竞争,所以唤醒的语义其实就是让线程进入EntryList中,因为Monitor的Owner释放锁后,EntryList中的线程可以主动的争夺锁,而WaitSet是不会主动争夺锁的。当线程从owner进入到WaitSet时,会释放锁,并通知EntryList中的线程争抢锁

API介绍

它们都是线程之间进行协作的手段,都属于Object对象的方法。必须获得此对象的锁(成为Monitor的主人),才能调用这几个方法,不是锁的拥有者会抛异常java.lang.IllegalMonitorStateException。所以必须在synchronized里获取到对象的锁才能使用(必须放在synchronized同步块中)。

obj.wait(0)让当前线程进入obj 关联的Monitor中的WaitSet等待,0可省略obj.wait(long) 有时限的等待,单位ms,超过时间后自动从WaitSet进入到EntryListobj.notify() 唤醒正在 obj关联的Monitor中的WaitSet中的一个线程,挑选一个唤醒obj.notifyAll() 唤醒obj Monitor中所有WaitSet中等待的线程。

如果是偏向锁,使用wait notify 会膨胀为重量级锁,因为需要Monniter,只有重量级锁才有Monitor

基本使用
public class WaitAndNotify {
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            log.debug("start...");
            synchronized (lock) {
                try {
                    lock.wait();
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t1").start();
        new Thread(() -> {
            log.debug("start...");
            synchronized (lock) {
                try {
                    lock.wait();
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t2").start();
        Thread.sleep(1000);
        synchronized (lock) {
            lock.notify();//只会随机的唤醒其中一个,另一个会一直在waitset等待,程序一直不结束
//            lock.notifyAll();//会全部唤醒,程序立即结束
        }
    }
}

sleep和wait的区别

sleep(long n)和wait(long n)

sleep是Thread的静态方法,wait是Object的方法sleep不需要强制和synchronized配合使用,但wait需要sleep在睡眠的时候不会释放对象锁,但wait等待的时候会释放对象锁

正确姿势

notify是随机挑选一个WaitSet里的线程唤醒,可能存在虚假唤醒,可以用notifyAll。

但是notifyAll可能让一些条件还没满足的线程唤醒,所以需要下面的代码解决这种问题,利用循环查看条件

模板:

synchronized(lock){
	while(条件不成立){ 
        lock.wait();
	}
    //干活
}

//另一个线程
synchronized(lock){
    lock.notifyAll();
}

下面是关于该wait、notify模板的例子。

同步模式 保护性暂停设计模式

即Guarded Suspension ,用于一个线程等待另一个线程的执行结果。

要点:

有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject如果有多个结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)JDK中,join的实现、Future的实现,采用的就是此模式因为要等待另一方的结果,因此归类到同步模式该设计模式和join等待结果的区别是:

join方式需要一直等待设置结果的线程运行完,才能获取结果,浪费资源join方式存放结果的变量必须为全局变量,有所限制

保护性暂停模式图

代码如下:线程t1等待线程t2的结果,通过一个GuardedObject中间对象实现

public class Clint {
    public static void main(String[] args) {
        //两个线程共用同一个GuardedObject对象
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            //等待结果。。。
            Object result = guardedObject.getResult();
            System.out.println("得到结果了..." + result);
        }, "t1").start();
        new Thread(() -> {
            try {
                //模拟产生结果的动作
                System.out.println("产生结果...");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            guardedObject.setResult(new String("result1"));
        }, "t2").start();
    }
}

class GuardedObject {
    //结果
    private Object result;

    //得到结果的方法
    public Object getResult() {
        //wait、notify模板
        synchronized (this) {
            //如果结果还没有结果的话就继续等待
            while (result == null) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }

    //产生结果的方法
    public void setResult(Object result) {
        synchronized (this) {
            //给结果赋值
            this.result = result;
            this.notifyAll();
        }
    }
}
增加超时效果

实现有限等待:当太久等不到结果时,就不等了

public Object getResult(long timeout) {
    //记录开始时间
    long startTime = System.currentTimeMillis();
    long passedTime = 0;
    synchronized (this) {
        //如果结果来了直接退出循环
        while (result == null) {
            //超过了指定的等待时间就直接退出循环
            if (passedTime >= timeout) {
                break;
            }
            try {
                //实现有限等待,但是等待完成后还是会进如循环,所以要记录时间,超时了再结束
                //为了防止虚假唤醒,所以等待的时间应该是timeout - passedTime
                this.wait(timeout - passedTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //经历的时间,可以防止虚假唤醒
            passedTime = System.currentTimeMillis() - startTime;
        }
        return result;
    }
}

这段代码的难点就是需要防止虚假唤醒,不能让等待线程多等。

join的原理

join采用了保护性暂停。就是join()方法是同步方法,主线程调用t1.join(),t1对象被主线程加了锁成了锁对象,可以当做前面例子中的obj;调用wait()方法是对所在的线程即主线程生效。jvm会在线程执行完成后帮我们调用notifyAll()方法,这样主线程就会重回Runnable状态,时间片分配后主线程可执行join方法之后的代码

			//注意有个synchronized,锁的是当前线程要等待的那个线程对象
public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis == 0) {  
        while (isAlive()) {//如果发现要等待的线程,就是本类对象,还存活,继续等待,防止虚假唤醒
            wait(0); //哪个线程里面调用的此方法。就是哪个线程进入等待
        }
    } else {	//设置超时效果
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {  //如果等够了,那就不等了
                break;  //打断循环退出方法,进入线程继续执行
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

多对保护性暂停

图中Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的t0,t2,t4就好比等待邮件的居民,右侧的t1, t3,t5就好比邮递员

如果我们使用原来的GuardedObject不是很方便,因为如果是在多个类中,那么GuardedObject对象就需要在多个类中存在,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

 代码:

扩展GuardedObject:

class GuardedObject {
    //结果
    private Object result;
    //每个对象保存一个唯一的id
    private int id;

    public GuardedObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
    public Object getResult(){……}
    public Object getResult(long timeout){……} 
    public void setResult(Object result){……} 
}

中间容器类

class MailBox {
    //存储多个GuardedObject对象的容器,需要保证线程安全
    private static Map map = new ConcurrentHashMap<>();
    private static int id;

    //产生id
    private static int generateId() {
        return id++;
    }
    //创建GuardedObject对象,由本类来生成它的id
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        map.put(go.getId(),go);
        return go;
    }
    //根据id获取到容器里的GuardedObject
    public static GuardedObject getGuardedObject(int id){
        return map.remove(id); //需要删除了
    }
    public static Set getIds(){
        return map.keySet();
    }
}

Person类:

@Slf4j
class Person extends Thread {
    private GuardedObject guardedObject;
    @Override
    public void run() {
        log.debug("等信 {}号信箱。。", guardedObject.getId());
        Object o = guardedObject.getResult(5000);//线程开始后,等待对应的信箱收到信
        log.debug("收到信了{},{}", guardedObject.getId(), o);
    }
    //给每个对象创建不同的信箱
    public Person() {
        this.guardedObject = MailBox.createGuardedObject();
    }
}

Postman类:

@Slf4j
public class Postman extends Thread{
    private Integer id;//要送信的对应信箱id
    private String mail;//内容
    public Postman(Integer id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardObject = MailBox.getGuardedObject(id);//取到对应id的GuardedObject对象
        log.debug("开始给{}信箱送信",id);
        guardObject.setResult(mail);//送信完成
    }
}

测试类:

public static void main(String[] args) {
    for (int i = 0; i <3; i++) {
        Person person = new Person();
        person.start();
    }
    //每个要送的信都生成一个邮递员
    for (Integer i : MailBox.getIds()) {
        Postman postman = new Postman(i, "给" + i + "号信箱的信");
        postman.start();
    }
}
10:45:12.190 [Thread-3] lxc.Postman - 166  开始给1信箱送信
10:45:12.190 [Thread-2] lxc.Person - 166  等信 3号信箱。。
10:45:12.190 [Thread-0] lxc.Person - 166  等信 1号信箱。。
10:45:12.190 [Thread-1] lxc.Person - 166  等信 2号信箱。。
10:45:12.190 [Thread-5] lxc.Postman - 166  开始给3信箱送信
10:45:12.190 [Thread-4] lxc.Postman - 166  开始给2信箱送信
10:45:12.194 [Thread-2] lxc.Person - 170  收到信了3,给3号信箱的信
10:45:12.194 [Thread-1] lxc.Person - 170  收到信了2,给2号信箱的信
10:45:12.194 [Thread-0] lxc.Person - 170  收到信了1,给1号信箱的信

异步模式 生产者消费者

要点:这种模式是Kafka的原理

与前面的保护性暂停中的GuardedObject不同,不需要产生结果和消费结果的线程一 一对应消费队列可以用来平衡生产和消费的线程资源生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据JDK中各种阻塞队列,采用的就是这种模式

之所以是异步模式,是因为生产者产生的消息或许不会立即被消费,而是要等到前面还有的消息被处理了,才能被处理,有一个优先级的关系,是有一定延迟的。

 代码:

Message消息类:

@Slf4j
//里面的数据不可变,保险起见加个final
final class Message{
    private int id;//唯一的id
    private Object value;

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    public int getId() {
        return id;
    }

    public Object getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}

消息容器类:

@Slf4j
//消息队列类,java线程之间通信
public class MessageQueue {
    private linkedList list=new linkedList<>();//消息队里的集合
    private int capacity;//队列容量

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }
    //生产一个消息,相当于生产者
    public void put(Message msg){
        synchronized (list){
            while (list.size()==capacity){//检查是否满
                try {
                    log.debug("队列满,生产者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.addLast(msg);
            log.debug("生产"+msg.toString());
            list.notifyAll();
        }
    }
    //获取消息,相当于消费者
    public Message take(){
        synchronized (list){
            while (list.isEmpty()){//检查是否为空,如果为空则阻塞消费者,等待生产者唤醒
                try {
                    log.debug("队列为空,消费者线程继续等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = list.removeFirst();
            log.debug("消费"+message);
            list.notifyAll();
            return message;
        }
    }
}

测试类:

@Slf4j
public class Client {
    public static void main(String[] args) {
        MessageQueue mq = new MessageQueue(2);
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                mq.put(new Message(id, "值" + id));
            }, "生产者" + i).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Message take = mq.take();
            }
        }, "消费者").start();
    }
}
14:18:07.544 [生产者0] lxc.MessageQueue.MessageQueue - 198  生产Message{id=0, value=值0}
14:18:07.545 [生产者1] lxc.MessageQueue.MessageQueue - 199  生产Message{id=1, value=值1}
14:18:07.546 [生产者2] lxc.MessageQueue.MessageQueue - 200  队列满,生产者线程等待
14:18:08.549 [消费者] lxc.MessageQueue.MessageQueue - 1203  消费Message{id=0, value=值0}
14:18:08.549 [生产者2] lxc.MessageQueue.MessageQueue - 1203  生产Message{id=2, value=值2}
14:18:09.562 [消费者] lxc.MessageQueue.MessageQueue - 2216  消费Message{id=1, value=值1}
14:18:10.573 [消费者] lxc.MessageQueue.MessageQueue - 3227  消费Message{id=2, value=值2}
14:18:11.587 [消费者] lxc.MessageQueue.MessageQueue - 4241  队列为空,消费者线程继续等待

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5707914.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存