DelayQueue是一个线程安全的(ReentrantLock实现)、无界的(通过grow(int minCapacity)自动扩容,写不阻塞)、阻塞的(take方法会阻塞)、延迟(元素需要实现Delayed接口)队列,加入其中的元素必需实现Delayed接口。当调用put之类的方法加入元素时,会触发接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的(当然,compareTo方法需要我们自己去实现,如果实现不当,可能导致队头元素是没有过期的,而其他元素可能已经过期了),而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
队列元素DelayQueue
public interface Delayed extends Comparable{ long getDelay(TimeUnit unit); }
由Delayed定义可以得知,队列元素需要实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定义了剩余到期时间,compareTo方法定义了元素排序规则。注意,元素的排序规则影响了元素的获取顺序,将在后面说明。
内部存储结构DelayQueue的元素存储交由优先级队列存放。
public class DelayQueueextends AbstractQueue implements BlockingQueue { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue q = new PriorityQueue (); // 存放元素
DelayQueue的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。
若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。
获取队列元素 非阻塞获取public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } @SuppressWarnings("unchecked") public E peek() { return (size == 0) ? null : (E) queue[0]; }
由代码我们可以看出,获取元素时,总是判断PriorityQueue队列的队首元素是否到期,若未到期,返回null,所以compareTo()的方法实现不当的话,会造成队首元素未到期,而队列中有到期元素却获取不到的情况。因此,队列元素的compareTo方法实现需要注意,应该根据过期时间进行排序。
阻塞获取public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }适用场景
实现自己的缓存系统,订单到期,限时支付等等。
扩展Spring Security OAuth2中,有一个InMemoryTokenStore,这个类就使用了DelayQueue存储Token的过期时间,详见InMemoryTokenStore的源码。InMemoryTokenStore的flush()方法会调用expiryQueue.poll()方法,这个方法会返回已经过期的对象,然后将其移除,以此来实现token的超时。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)