第四次循环就到了下标0的位置,也就是元素8,首先完成1和3的替换,然后完成3和8的替换:
! 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 [在这里插入图片描述](()
这时候因为最小子节点的下标是2,2 上图中两个流程可以看到,元素8会一路下沉到最后。 到这里完成了初始化排序,最终数组由:[8,5,2,7,6,4,1,9,3]变为[1,3,2,5,6,4,8,9,7]。 [](()添加元素(生产者) put(E)方法会调用offer(E)方法,[上一篇阻塞队列](()的文章中,我们知道,offer(E)方法是不阻塞的,而这里是无界数组也不会阻塞,所以直接调用offer(E)方法就可以了: 这里逻辑比较简单,首先看有没有越界,越界了就先进行扩容,扩容放在后面讲。 然后添加元素主要就是进行上浮过程,进入默认的排序规则上浮方法siftUpComparable: 还是用上面排序后的二叉堆,假如我们现在添加一个元素4,会得到下面这样一个二叉堆: 这时候为了确保新添加的元素按照排序规则不会比根节点小,需要将新添加的元素进行上浮 *** 作。 发现4<6,所以将6放到队尾,注意这时候4并不会赋值到队列中,因为4还需要继续上浮确认放在哪个位置 第二次上浮会发现4<3,不满足所以会跳出循环,确认将4放在了下标4的位置,完成插入元素操作 [](()获取元素(消费者) 调用take()方法获取元素 主要看dequeue()方法: 这个方法的主要逻辑为: 1、先拿到第一个元素(需要返回)和最后一个元素 2、然后将最后一个元素置为空 3、用存好的最后一个元素的值从头开始下沉 最后一步下沉 *** 作和初始化的最后一步下沉 *** 作是一样的处理方式,直到完成下沉就会诞生一个最小的元素重新放到头节点 [](()扩容 最后我们来分析下扩容tryGrow方法 private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常 *** 作) Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {//通过CAS *** 作确保只有一个线程可以扩容 try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常 throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量 } if (newCap > oldCap && queue == array) newArray = new Object[newCap];//根据最新容量初始化一个新数组 } finally { allocationSpinLock = 0; } } if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU Thread.yield(); lock.lock();//这里重新加锁是确保数组复制 *** 作只有一个线程能进行 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组 } } [](()DelayQueue ======================================================================= DelayQueue是一个支持延时获取元素的无界阻塞队列。队列中使用PriorityQueue来实现。队列中的元素必须实现Delayed接口: 接口里定义了一个getDelay方法来获取当前剩余的过期时间,另外因实现了Comparable接口,所以还会有一个compareTo方法。 [](()DelayQueue使用示例 1、新建一个对象,实现Delayed ,并重写getDelay和compareTo package com.zwx.concurrent.queue.block.model; import java.sql.Time; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class MyElement implements Delayed { private long expireTime;//过期时间(毫秒) private int id; public long getExpireTime() { return expireTime; } public void setExpireTime(long expireTime) { this.expireTime = expireTime; } public int getId() { return id; } public void setId(int id) { this.id = id; } public MyElement(int id, long expireTime) { this.id = id; this.expireTime = System.currentTimeMillis() + expireTime; } @Override public long getDelay(TimeUnit unit) { //类里面接收的是毫秒,但是getDelay方法在DelayQeue里面传的是纳秒,所以这里需要进行一次单位转换 return unit.convert(expireTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { //注意,这里的排序要确定最先到期的放在第一位,否则会阻塞住后面未到期的 return Long.valueOf(expireTime).compareTo(((MyElement) o).expireTime); } } package com.zwx.concurrent.queue.block; import com.zwx.concurrent.queue.block.model.MyElement; import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; public class DelayQueueDemo { public static void main(String[] args) { List list = new ArrayList<>(); for (int i=1;i<=5;i++){ MyElement myElement = new MyElement(i,i*1000); list.add(myElement); } DelayQueue delayQueue = new DelayQueue(list); while (true){ try { MyElement myElement = (MyElement) delayQueue.take(); System.out.println(myElement.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } } [](()DelayQueue类图 接下来看看类图 只有两个构造器,第一个是空的构造器,第二个是默认初始化一个集合。 [](()初始化 通过循环调用add(e)方法进行添加,然后add方法又去调用了offer(e)方法: [](()添加元素(消费者) DelayQueue队列的元素是存在其内部维护的PriorityQueue上,所以上面调用了q.offer(e)方法。 leader表示获取到锁的线程。q.peek()==e表示当前第一个元素就是刚刚添加进去的元素,所以需要将leader设置为空,唤醒出队(消费者)线程重新争抢锁。 q.offer(e)方法的处理方式基本和上面讲的PriorityBlockingQueue中逻辑一致 [](()获取元素(消费者) take方法会依次获取元素,如果第一个元素没到期,则会一直阻塞: public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;😉 { E first = q.peek(); if (first == null) available.await();//队列为空,则阻塞 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll();//如果到期了,则调用poll方法取元素并直接返回 first = null; // don’t retain ref while waiting if (leader != null) available.await();//头节点不为空,说明有线程持有锁并正在等待到期时间,所以直接阻塞 else {//leader==null Thread thisThread = Thread.currentThread(); leader = thisThread;//设置头节点为当前线程,表名有线程在等待头节点元素过期 try { available.awaitNanos(delay);//阻塞指定时间 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } [](()Leader-Follower线程模型 在Leader-follower线程模型中每个线程有三种模式: leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待 follower:会一直尝试争抢leader,抢到leader之后才开始干活 processing:处理中的线程 DelayQueue队列中有一个leader属性:private Thread leader = null;用到的就是Leader-Follower线程模型。 欢迎分享,转载请注明来源:内存溢出
评论列表(0条)