阻塞队列之 LinkedBlockingQueue

阻塞队列之 LinkedBlockingQueue,第1张

阻塞队列之LinkedBlockingQueue 阻塞队列的概念

关于队列的概念,我们不在赘述,可以参考:队列以及队列的应用

阻塞队列就是在队列的基础上对队列的读写 *** 作加了阻塞的功能,并且提供了线程安全的机制

  1. 阻塞读,当从阻塞队列中获取元素的时候,如果队列不为空,正常获取,如果队列为空,线程会阻塞,等到队列中有元素在尝试获取
  2. 阻塞写,当队列不满时,可以正常插入元素,但如果队列满,并不会放弃,而是会阻塞,直到队列中的元素被消费导致队列中有空位置,在尝试插入元素
阻塞队列的核心API

BlockingQueue是阻塞队列的顶级接口,他规定了阻塞队列必须要实现的方法,下面我们列举出关于读写的几个方法:

方法功能
boolean add(E e)插入元素,成功返回true,失败会抛出一个异常IllegalStateException
boolean offer(E e)插入元素,成功返回true,失败返回false
void put(E e)插入元素,如果队列已经满了,会阻塞等待
boolean offer(E e, long timeout, TimeUnit unit)插入元素,如果队列已经满了,会阻塞等到有空间插入,但是不会一直等下去,timeoutunit决定了等待多久
boolean remove(Object o)
public E poll()
E take()检索并将队头元素出队,如果队列为空,会阻塞等待队列有元素
E poll(long timeout, TimeUnit unit)检索并将队头元素出队,如果队列为空,会阻塞等待队列有元素,但是不会一直等下去,timeoutunit决定了等待多久
常用的阻塞队列

BlockingQueue有很多实现类,提供了各种功能强大的阻塞队列为我们所用,下面我们通过他的类图看一下具体提供了那些实现类。

  • DelayedWorkQueue:优先阻塞队列,通过二叉堆保证优先级
  • LinkedBlockingQueue:通过链表实现的阻塞队列
  • ArrayBlockingQueue:通过数组实现的阻塞队列
  • LinkedTransferQueue:当消费者来取元素的时候,如果没有元素,会创建一个空的节点,并阻塞在这里,下次生产者如果添加元素,会直接将元素添加到这个节点,唤醒生产者去消费。
  • DelayQueue:支持延时获取元素的阻塞队列
  • SynchronousQueue:同步队列,不存储元素,每一个put *** 作都会等待一个take *** 作完成
  • PriorityBlockingQueue:优先阻塞队列
  • LinkedBlockingDeque:通过双向链表实现的双向阻塞队列
LinkedBlockingQueue源码探究 基本数据结构
static class Node<E> {
    E item;

    /**
      * One of:
      * - the real successor Node
      * - this Node, meaning the successor is head.next
      * - null, meaning there is no successor (this is the last node)
      */
    Node<E> next;

    Node(E x) { item = x; }
}

/**
  * Head of linked list.
  * Invariant: head.item == null
  */
transient Node<E> head;

/**
  * Tail of linked list.
  * Invariant: last.next == null
  */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

以上是组成LinkedBlockingQueue的基本数据结构,通过代码可以得到以下几点关键信息:

  1. 首先他是通过一个单向链表维护的队列,并且提供了两个成员变量headlast,分别指向队头和队尾,队头节点不存储元素,只作为一个虚拟的头结点,方便链表的 *** 作。
  2. 提供了两把由ReentrantLock实现的可重入锁takeLockputLock,用来实现从队列中取元素和写元素时的线程阻塞和线程安全的功能。我们知道ReentrantLock是基于AQS实现的,所以自然会有用于排队的CLH队列。用来存储没有拿到锁的线程
  3. 两个条件队列notEmptynotFull。这又是干什么的呢,我们知道阻塞队列取元素,如果队列为空,是需要去阻塞,那直接阻塞到CLH队列可以吗?当然不行,我们需要将他们先暂存到条件队列,等生产者给阻塞队列里面放入了元素,才可以将这个条件队列唤醒,让他们转移到CLH队列,在去竞争takeLock这把锁,否则不管有没有元素,都放到CLH队列去竞争锁,那显然是不合理的。

画一个简单的图来表示呢就是这个样子

构造方法
/**
 * 无参的构造方法,设定容量为Integer.MAX_VALUE
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 * 执行容量的构造方法
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

/**
 * 支持将一个集合类批量插入LinkedBlockingQueue
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
入队相关API

add

// add方法是继承自父类AbstractQueue的方法,只是调用了offer方法,
// 如果offer方法返回true,也就返回true,如果offer返回false,则抛出一个异常
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

offer

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/726892.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存