让你直观理解阻塞队列

让你直观理解阻塞队列,第1张

让你直观理解阻塞队列 前言

上篇讲述了线程池的原理,回顾一下线程池执行流程,在线程池核心线程达到最大值后,之后的线程会首先加入阻塞队列中,当阻塞队列达到指定最大值后,才会创建非核心线程执行后续任务,详细内容请看https://blog.csdn.net/weixin_45055749/article/details/120858473?spm=1001.2014.3001.5501那么这个阻塞队列具体什么样的队列?

概念

阻塞队列:用于生产者-消费者 *** 作数据的一种可阻塞式的队列,具体来讲生产者生产产品加入阻塞队列中,消费者从队列中获取数据并消费

BlockingQueue 方法有四种形式,对不能立即满足但可能在未来某个时候满足的 *** 作有不同的处理方式:

  • 抛出异常:add()添加任务时,如果队列已满,抛出异常,remove()时队列任务为0抛出异常
  • 返回一个特殊值(空或假,取决于 *** 作):offer()添加任务时队列已满返回false,poll()在队列任务为0返回false
  • 线程无限期地阻塞当前线程,直到 *** 作成功:put()添加任务,当队列任务已满会阻塞,take()获取任务,当队列任务为0会阻塞
  • 阻塞仅给定的最大时间限制,然后放弃:给定具体时间与时间单位,超过单位时间后仍无法获取,则跳过
    官方文档为我们提供了这样一些参考方法:

    阻塞队列我们可以具象到两个具体的人来对阻塞队列进行 *** 作

队列四种 *** 作方式都比较好理解,我在这里重点说一下阻塞线程,其他 *** 作内部原理大致相同
我们设一个生产者和一个消费者,假如有十个消费者一个生产者,消费速度远大于生产速度,很快,队列中任务被消费者使用空,则消费者线程调用take()从队列中获取任务时,发现队列任务为0,则该线程进入睡眠状态,而此时生产者不断生产任务放入队列,当队列不为0时阻塞队列则会唤醒消费者来消费任务,反之,假如有十个生产者一个消费者,由于生产速度过快,队列已满后,生产者再次调用put()方法,则会进行阻塞,这是消费者开始执行队列中的任务,最后唤醒生产者生产任务
这就是阻塞队列的实现原理

通过图来简单看一下它的流程
我们具体来看下put()方法内部具体实现(这里使用的是常用的ArrayBlockingQueue)

  public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);  //内部判空if (obj == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await(); //队列已满,使用notFull阻塞所有生产者线程
            enqueue(e);//内部当成功添加一条任务,调用notEmpty.signal()--相当于obj.notify(),唤醒消费者
        } finally {
            lock.unlock();
        }
    }
    
       public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();//队列为空,使用notEmpty阻塞所有消费者线程
            return dequeue();  //内部当成功添加一条任务,调用notFull.signal()--相当于obj.notify(),唤醒消费者
        } finally {
            lock.unlock();
        }
    }

// 通过代码也可以看出,阻塞队列实现阻塞方式是通过ReentrantLock中的condition来实现的,它的实例化是在构造方法中
		notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
//创建的,关于condition这里不在详细介绍,读者有兴趣可以网上了解了解

在每一个添加元素方法中都会有Objects.requireNonNull(e)这一句,所以我们可以知道队列本身是不允许添加空元素的,否则会抛NullPointerException

实现类

在jdk1.5之前这些同步问题都需要程序员自己来注意并实现,在实现业务逻辑过程中,程序员需要自主代码来实现这些过程,这样就加重了程序员的负担,到了jdk1.5之后Doug Lea推出了java.util.concurrent包来解决这些问题大大减轻了编程者的负担

在java.util.concurrent下有两个阻塞队列的接口:BlockingQueue和BlockingDeque

BlockingQueue
  • ArrayBlockingQueue:一个基于数组的队列,内部通过给定的大小capacity来控制数组大小this.items = new Object[capacity];是一个常用的阻塞队列,默认为非公平锁,此队列对元素 FIFO(先进先出)进行排序。队列的 头部是在队列中停留时间最长的那个元素。队列的尾部是在队列中停留时间最短的那个元素。新元素插入队列尾部,队列检索 *** 作获取队列头部元素
  • linkedBlockingQueue:基于链接节点的可选有界阻塞队列.如果不指定,默认为Integer.MAX_VALUE,与ArrayBlockingQueue类似,差别在于内部使用的是节点管理任务
  • DelayQueue :一个无界的元素阻塞队列Delayed,其中一个元素只能在其延迟到期时被采用。虽然这个队列在逻辑上是无界的,但由于资源耗尽(导致OutOfMemoryError),每一个元素都是添加延时 *** 作,包括获取,在延迟一定时间后返回每个任务
  • PriorityBlockingQueue :基于优先级的无界阻塞队列,默认情况下元素采用自然顺序升序排列。是通过构造函数传入的对象来判断,传入的对象实现comparable接口后,每一次的添加和获取 *** 作都会调用compareTo()进行排序,但是只是把第一个元素排在首位,其他元素按照队列的一系列复杂算法排序。这就保障了每次获取到的元素都是经过排序的第一个元素。
  • SynchronousQueue: 一个阻塞队列,其中每个插入 *** 作都必须等待另一个线程执行相应的移除 *** 作,反之亦然。同步队列没有任何内部容量,甚至没有一个容量。甚至都不能叫做队列,直来直往的方式添加和删除
BlockingDeque

一个双向队列,它可以被线程安全的放入以及从中获取实例,deque是一个队列,你可以从它的两端插入和获取元素,所以叫做双向队列

这是我从官方文档获取的方法表,BlockingDeque继承自BlockingQueue,由于BlockingDeque是一个接口,而实现他的唯一一个子类:
linkedBlockingDeque:linkedBlockingDeque与linkedBlockingQueue的实现大体上类似,区别在于linkedBlockingDeque提供的 *** 作更多。并且linkedBlockingQueue内置两个锁分别用于put和take *** 作,而linkedBlockingDeque只使用一个锁控制所有 *** 作。因为队列能够同时在头尾进行put和take *** 作,所以使用两个锁也需要将两个锁同时加锁才能保证 *** 作的同步性,不如只使用一个锁的性能好。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575058.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存