一、什么是延迟队列(DelayQueue)?
DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。
/**
* An unbounded {@linkplain BlockingQueue blocking queue} of
* {@code Delayed} elements, in which an element can only be taken
* when its delay has expired. The <em>head</em> of the queue is that
* {@code Delayed} element whose delay expired furthest in the
* past. If no delay has expired there is no head and {@code poll}
* will return {@code null}. Expiration occurs when an element's
* {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
* than or equal to zero. Even though unexpired elements cannot be
* removed using {@code take} or {@code poll}, they are otherwise
* treated as normal elements. For example, the {@code size} method
* returns the count of both expired and unexpired elements.
* This queue does not permit null elements.
* /
由定义可知,DelayQueue 是一个无界阻塞队列,队列中的元素只有在延迟期满后才能被取出。队列的头部存储的是最先到期的元素。添加进该队列的元素必须实现 Delayed 接口,指定延迟时间,元素过期的判断是根据 getDelay(TimeUnit unit) 方法的返回值,返回值小于等于 0,则认为元素过期。队列不允许存储空元素。
二、DelayQueue 的使用场景
DelayQueue 被用于需要延迟处理任务的场景,例如,网民在网上商城下单后,如果超时未支付,订单会被后台系统关闭。这种需要延时处理的场景就可以采用 DelayQueue 实现。
三、原理解析(源码)
3.1 Class 定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// ...
}
DelayQueue 类继承了AbstractQueue,并实现了 BlockingQueue 接口,DelayQueue 的泛型参数(即队列中的元素)要实现 Delayed 接口。Delayed 接口定义如下。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed 接口继承了 Comparable 接口,Comparable 接口定义如下。
public interface Comparable<T> {
public int compareTo(T o);
}
所以,延迟队列中的元素要实现 getDelay(TimeUnit unit) 和 compareTo(T o) 两个方法。
- compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
- getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
3.2 延迟队列的属性
DelayQueue 中的重要属性如下所示。
// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();
DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式,具体见本文第四节示例。
3.3 DelayQueue 的主要方法
3.3.1 offer 添加元素
public boolean offer(E e) {
// 获取全局独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 向优先队列中插入元素
q.offer(e);
// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
// 释放全局独占锁
lock.unlock();
}
}
- leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
- 如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
- 如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。
DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。
3.3.2 take 取出元素
take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
public E take() throws InterruptedException {
// 获取全局独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队头元素,peek 方法不会删除元素
E first = q.peek();
if (first == null)
// 若队头为空,则阻塞当前线程
available.await();
else {
// 否则获取队头元素的超时时间
long delay = first.getDelay(NANOSECONDS);
// 已超时,直接出队
if (delay <= 0)
return q.poll();
// 释放 first 的引用,避免内存泄漏
first = null; // don't retain ref while waiting
// leader != null 表明有其他线程在操作,阻塞当前线程
if (leader != null)
available.await();
else {
// leader 指向当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超时阻塞
available.awaitNanos(delay);
} finally {
// 释放 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列
if (leader == null && q.peek() != null)
available.signal();
// 释放全局独占锁
lock.unlock();
}
}
3.3.3 poll 取出元素
取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
四、什么是 Leader-Follower 模式?
DelayQueue 采用了 Leader-Follower 模式,那什么是 Leader-Follower 模式呢?举个 “饭店运作模式” 的例子来对照理解。
4.1 单 Reactor 多线程模式(Reactor 模式称为反应器模式或应答者模式)
饭店的员工一般都是分角色的,比如接待员、服务员、厨师等等。
假如有一个叫做 A 的人,固定他作为饭店接待员,来客人了就分给客人一个座位号,然后交给其他服务员,比如 B 进行后续处理。B 会根据座位号为客人引路,为客人点菜等等。如果把 A、B 比作两个线程,客人比作任务,任务由 A 处理,到交接给 B 处理,有一次线程上下文切换。
4.2 Leader-Follower 模式
这次饭店不分角色了,每个人都是接待员和服务员,统称为员工。
每次只能有一个员工在门口等待,比如 A 先在门口等待,其他员工在屋里歇着。来客人了的话,A 会叫一个其他员工,比如 B 来门口接替自己。然后 A 开始为客人服务,比如分配座位号,引路,点菜等全流程服务。拿线程来说的话,就是接受任务,处理任务都是由线程 A 负责,没有线程上下文切换。
4.3 DelayQueue 的 Leader-Follower 模式
这次饭店也不分角色,都是员工,但是改变了经营策略,每个客人必须预约吃饭时间,预约采用 APP 预约。因为加入了延时,逻辑变得复杂了一些。
每次还是只能有一个员工在门口等待,比如 A 先在门口等待,A 看了眼预约登记表,发现离预约最早到店的时间还有 30 分钟,A 就什么都不干了,先休息 30 分钟。
其他员工还是先在屋里歇着,但是因为采用 APP 预约,客人约几点都有可能,如果此时有客人约的是 10 分钟后到店,因为 A 要 30 分钟后才能醒来干活,所以如果这位客人来了,门口就没有人接待了。
对于这个问题,饭店的软件系统在监听到最早到店时间变了的话,会再叫一个员工来门口等待,此员工可能是新员工 B,也可能是叫醒了之前在门口休息的员工 A。我们叫这位新员工 X。 如果新员工 X 发现最早到店时间是现在,或者客人已经来了,就会叫一个员工 C 来门口接替自己,并立即开始为客人提供全流程服务。如果新员工 X 发现最早到店时间是 10 分钟后,新员工 X 就像 A 之前一样,什么都不干了,先休息 10 分钟。
如果最早到店时间没有变化,还是 30 分钟后,软件系统不会叫人,其他员工看到 A 在门口等待,自己可以安心的在屋里歇着,等着 A 叫人替换他。
员工 A 在 30 分钟后醒来,客人也到了,A 会叫一个同事(比如 B)接替自己,而 A 为客人提供全流程服务。
五、图解 DelayQueue 的生产/消费过程
DelayQueue 是 Leader-Follower 模式的变种,以下通过队列及消费者线程状态变化大致说明一下 DelayQueue 的运行过程。
初始状态
因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单,所以此图不考虑向队列添加元素的生产者线程。假设现在共有三个消费者线程。
队列中的元素按到期时间排序,队列头部的元素 2s 以后到期。消费者线程1查看了头部元素以后,发现还需要 2s 才到期,于是它进入等待状态,2s 以后醒来,等待头部元素到期的线程称为 Leader 线程。
消费者线程 2 与消费者线程 3 处于待命状态,它们不等待队列中的非头部元素。当消费者线程1拿到对象 5 以后,会向它们发送 signal。这个时候两个线程中的一个会结束待命状态而进入等待状态。
2S 以后
消费者线程1已经拿到了对象5,从等待状态进入处理状态,处理它取到的对象5,同时向消费者线程 2 与消费者线程 3 发送 signal。
消费者线程 2 与消费者线程 3 会争抢领导权,这里是消费者线程 2 进入等待状态,成为 Leader 线程,等待 2s 以后对象 4 到期。而消费者线程 3 则继续处于待命状态。
此时队列中加入了一个新元素对象6,它 10s 后到期,排在队尾。
又 2S 以后
先看线程1,如果它已经结束了对象 5 的处理,则进入待命状态。如果还没有结束,则它继续处理对象 5。
消费线程2取到对象 4 以后,也进入处理状态,同时给处于待命状态的消费线程3发送信号,消费线程3进入等待状态,成为新的 Leader。现在头部元素是新插入的对象 7,因为它 1s 以后就过期,要早于其它所有元素,所以排到了队列头部。
又 1S 后
一种不好的结果:
消费线程3一定正在处理对象 7。消费线程1与消费线程2还没有处理完它们各自取得的对象,无法进入待命状态,也更加进入不了等待状态。此时对象 3 马上要到期,那么如果它到期时没有消费者线程空下来,则它的处理一定会延期。
可以想见,如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长。
另外一种好的结果:
消费线程1与消费线程2很快完成对取出对象的处理,及时返回重新等待队列中的到期元素。一个处于等待状态(Leader),对象 3 一到期就立刻处理。另一个则处于待命状态。这样,每一个对象都能在到期时被及时处理,不会发生明显的延期。
所以,消费者线程的数量要够,处理任务的速度要快。否则,队列中的到期元素无法被及时取出并处理,造成任务延期、队列元素堆积等情况。
如果消费者线程数过少,则来不及处理到期的任务。如果消费者线程数过多,在线程调度、同步上花更多的时间,无益改善性能。
文章参考
- 延迟队列DelayQueue原理
- DelayQueue
- DelayQueue实现原理及应用场景分析
- 读源码DelayQueue-Leader-Follower模式