【并发编程系列9】阻塞队列之PriorityBlockingQueue,DelayQueue原理分析

【并发编程系列9】阻塞队列之PriorityBlockingQueue,DelayQueue原理分析,第1张

[](()第四次下沉

第四次循环就到了下标0的位置,也就是元素8,首先完成1和3的替换,然后完成3和8的替换:

! 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 [在这里插入图片描述](()

这时候因为最小子节点的下标是2,2

上图中两个流程可以看到,元素8会一路下沉到最后。

到这里完成了初始化排序,最终数组由:[8,5,2,7,6,4,1,9,3]变为[1,3,2,5,6,4,8,9,7]。

[](()添加元素(生产者)


put(E)方法会调用offer(E)方法,[上一篇阻塞队列](()的文章中,我们知道,offer(E)方法是不阻塞的,而这里是无界数组也不会阻塞,所以直接调用offer(E)方法就可以了:

这里逻辑比较简单,首先看有没有越界,越界了就先进行扩容,扩容放在后面讲。

然后添加元素主要就是进行上浮过程,进入默认的排序规则上浮方法siftUpComparable:

还是用上面排序后的二叉堆,假如我们现在添加一个元素4,会得到下面这样一个二叉堆:

这时候为了确保新添加的元素按照排序规则不会比根节点小,需要将新添加的元素进行上浮 *** 作。

[](()第一次上浮

发现4<6,所以将6放到队尾,注意这时候4并不会赋值到队列中,因为4还需要继续上浮确认放在哪个位置

[](()第二次上浮

第二次上浮会发现4<3,不满足所以会跳出循环,确认将4放在了下标4的位置,完成插入元素操作

[](()获取元素(消费者)


调用take()方法获取元素

主要看dequeue()方法:

这个方法的主要逻辑为:

1、先拿到第一个元素(需要返回)和最后一个元素

2、然后将最后一个元素置为空

3、用存好的最后一个元素的值从头开始下沉

最后一步下沉 *** 作和初始化的最后一步下沉 *** 作是一样的处理方式,直到完成下沉就会诞生一个最小的元素重新放到头节点

[](()扩容


最后我们来分析下扩容tryGrow方法

private void tryGrow(Object[] array, int oldCap) {

lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常 *** 作)

Object[] newArray = null;

if (allocationSpinLock == 0 &&

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

0, 1)) {//通过CAS *** 作确保只有一个线程可以扩容

try {

int newCap = oldCap + ((oldCap < 64) ?

(oldCap + 2) : // grow faster if small

(oldCap >> 1));

if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出

int minCap = oldCap + 1;

if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常

throw new OutOfMemoryError();

newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量

}

if (newCap > oldCap && queue == array)

newArray = new Object[newCap];//根据最新容量初始化一个新数组

} finally {

allocationSpinLock = 0;

}

}

if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU

Thread.yield();

lock.lock();//这里重新加锁是确保数组复制 *** 作只有一个线程能进行

if (newArray != null && queue == array) {

queue = newArray;

System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组

}

}

[](()DelayQueue

=======================================================================

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列中使用PriorityQueue来实现。队列中的元素必须实现Delayed接口:

接口里定义了一个getDelay方法来获取当前剩余的过期时间,另外因实现了Comparable接口,所以还会有一个compareTo方法。

[](()DelayQueue使用示例


1、新建一个对象,实现Delayed ,并重写getDelay和compareTo

package com.zwx.concurrent.queue.block.model;

import java.sql.Time;

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

public class MyElement implements Delayed {

private long expireTime;//过期时间(毫秒)

private int id;

public long getExpireTime() {

return expireTime;

}

public void setExpireTime(long expireTime) {

this.expireTime = expireTime;

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public MyElement(int id, long expireTime) {

this.id = id;

this.expireTime = System.currentTimeMillis() + expireTime;

}

@Override

public long getDelay(TimeUnit unit) {

//类里面接收的是毫秒,但是getDelay方法在DelayQeue里面传的是纳秒,所以这里需要进行一次单位转换

return unit.convert(expireTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);

}

@Override

public int compareTo(Delayed o) {

//注意,这里的排序要确定最先到期的放在第一位,否则会阻塞住后面未到期的

return Long.valueOf(expireTime).compareTo(((MyElement) o).expireTime);

}

}

package com.zwx.concurrent.queue.block;

import com.zwx.concurrent.queue.block.model.MyElement;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.DelayQueue;

public class DelayQueueDemo {

public static void main(String[] args) {

List list = new ArrayList<>();

for (int i=1;i<=5;i++){

MyElement myElement = new MyElement(i,i*1000);

list.add(myElement);

}

DelayQueue delayQueue = new DelayQueue(list);

while (true){

try {

MyElement myElement = (MyElement) delayQueue.take();

System.out.println(myElement.getId());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

[](()DelayQueue类图


接下来看看类图

只有两个构造器,第一个是空的构造器,第二个是默认初始化一个集合。

[](()初始化


通过循环调用add(e)方法进行添加,然后add方法又去调用了offer(e)方法:

[](()添加元素(消费者)


DelayQueue队列的元素是存在其内部维护的PriorityQueue上,所以上面调用了q.offer(e)方法。

leader表示获取到锁的线程。q.peek()==e表示当前第一个元素就是刚刚添加进去的元素,所以需要将leader设置为空,唤醒出队(消费者)线程重新争抢锁。

q.offer(e)方法的处理方式基本和上面讲的PriorityBlockingQueue中逻辑一致

[](()获取元素(消费者)


take方法会依次获取元素,如果第一个元素没到期,则会一直阻塞:

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

for (;😉 {

E first = q.peek();

if (first == null)

available.await();//队列为空,则阻塞

else {

long delay = first.getDelay(NANOSECONDS);

if (delay <= 0)

return q.poll();//如果到期了,则调用poll方法取元素并直接返回

first = null; // don’t retain ref while waiting

if (leader != null)

available.await();//头节点不为空,说明有线程持有锁并正在等待到期时间,所以直接阻塞

else {//leader==null

Thread thisThread = Thread.currentThread();

leader = thisThread;//设置头节点为当前线程,表名有线程在等待头节点元素过期

try {

available.awaitNanos(delay);//阻塞指定时间

} finally {

if (leader == thisThread)

leader = null;

}

}

}

}

} finally {

if (leader == null && q.peek() != null)

available.signal();

lock.unlock();

}

}

[](()Leader-Follower线程模型


在Leader-follower线程模型中每个线程有三种模式:

  • leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待

  • follower:会一直尝试争抢leader,抢到leader之后才开始干活

  • processing:处理中的线程

DelayQueue队列中有一个leader属性:private Thread leader = null;用到的就是Leader-Follower线程模型。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/905981.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存