一、Future&FutureTask
在《Java线程详解》这篇文章中,介绍了创建一个Java线程的三种方式,其中继承Thread
类或实现Runnable
接口都可以创建线程,但这两种方法都有一个问题就是:没有返回值,不能获取执行完的结果。因此后面在JDK1.5才新增了一个Callable
接口来解决上面的问题,而Future
和FutureTask
就可以与Callable
配合起来使用。
而Callable
只能在线程池中提交任务使用,且只能在submit()
和invokeAnay()
以及invokeAll()
这三个任务提交的方法中使用,如果需要直接使用Thread
的方式启动线程,则需要使用FutureTask
对象作为Thread
的构造参数,而FutureTask
的构造参数又是Callable
的对象
下面展示线程中的使用,以submit()
为例,其源码如下:
在该方法中,只是把Callable
封装成了FutureTask
而已,任务执行依然是execute()
方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
1.1 Callable和Runnable的区别
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable
配合的有一个Future
类,通过Future
可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable
做不到的,Callable
的功能要比Runnable
强大。
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("通过Runnable方式执行任务");
}
}).start();
// 需要借助FutureTask
FutureTask task = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
System.out.println("通过Callable方式执行任务");
Thread.sleep(3000);
return "返回任务结果";
}
});
new Thread(task).start();
1.2 从Future到FutureTask
先看一下Future
的源码及其定义:
源码中接口上面的注释已经解释地很清楚了,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*/
public interface Future<V> {
// 取消任务的执行,参数表示是否立即中断任务执行,或者等任务结束
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已经取消,任务完成前将其取消,则返回true
boolean isCancelled();
// 任务是否已经完成
boolean isDone();
// 等待任务执行结束,返回泛型结果.中断或任务执行异常都会抛出异常
V get() throws InterruptedException, ExecutionException;
// 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
其工作过程大概如下:
Future
归根结底只是一个接口,而FutureTask
实现了这个接口,同时还实现了Runnalbe
接口,这样FutureTask
就相当于是消费者和生产者的桥梁了,消费者可以通过FutureTask
存储任务的执行结果,跟新任务的状态:未开始、处理中、已完成、已取消等等。而任务的生产者可以拿到FutureTask
被转型为Future
接口,可以阻塞式的获取处理结果,非阻塞式获取任务处理状态
总结:FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
// 任务
private Callable<V> callable;
/** The result to return or exception to throw from get() */
// 执行结果或异常
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
// 执行任务的线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
1.3 FutureTask使用
使用方式在上面已经介绍过了,构建一个FutureTask
对象,其构造方法的入参为Callable
的实例对象,然后将FutureTask
对象作为Thread
构造方法的入参。
这里展示一个实际的应用场景,平常在使用购物软件抢购促销产品的时候,需要查看商品信息(包括商品基本信息、商品价格、商品库存、商品图片)。而这些信息一般都分布在不同的业务中心,由不同的系统提供服务。如果采用同步方式,假设一个接口需要50ms,那么一个商品查询下来就需要200ms-300ms,这对于我们来说是不满意的。如果使用Future
改造则需要的就是最长耗时服务的接口,也就是50ms左右。
如果采用同步方式,假设一个接口需要50ms,那么一个商品查询下来就需要200ms-300ms,这对于我们来说是不满意的。如果使用Future改造则需要的就是最长耗时服务的接口,也就是50ms左右。
Future的注意事项:
- 当 for 循环批量获取Future的结果时容易 block,get 方法调用时应使用 timeout 限制
- Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
Future的局限性:
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
而这些局限性CompletionService
和CompletableFuture
都解决了,这边文章重点介绍CompletionService
,后面会出一篇文章介绍CompletableFuture
的使用
二、CompletionService使用
Callable+Future
虽然可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。而CompletionService
的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
CompletionService
内部通过阻塞队列+FutureTask
,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take()
或poll()
可以获取到一个已经执行完成的Future
,进而通过调用Future
接口实现类的get方法获取最终的结果
2.1 使用案例分析
2.1.1 电商询价
直接Future
方式
向不同电商平台询价,并保存价格,采用ThreadPoolExecutor+Future
的方案:异步执行询价然后再保存
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(()->getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2= executor.submit(()->getPriceByS2());
// 获取电商S1报价并异步保存
executor.execute(()->save(f1.get()));
// 获取电商S2报价并异步保存
executor.execute(()->save(f2.get())
如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞 在了f1.get()
操作上。
CompletionService
方式
使用CompletionService
实现先获取的报价先保存到数据库
//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
//异步向电商S1询价
cs.submit(() -> getPriceByS1());
//异步向电商S2询价
cs.submit(() -> getPriceByS2());
//将询价结果异步保存到数据库
for (int i = 0; i < 2; i++) {
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
2.1.2 Dobbo的Forking Cluter场景
Dubbo
中有一种叫做Forking
的集群模式,这种集群模式下,支持并行地调用多个服务实例,只要有一个成功就返回结果。
geocoder(addr) {
//并行执行以下3个查询服务,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
//只要r1,r2,r3有一个返回
//则返回
return r1|r2|r3;
}
通过CompletionService
来实现这种Forking
集群模式
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存Future对象
List<Future<Integer>> futures = new ArrayList<>(3);
//提交异步任务,并保存future到futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
//简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
//取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
应用场景总结
- 当需要批量提交异步任务的时候建议使用
CompletionService
。CompletionService
将线程池Executor
和阻塞队列BlockingQueue
的功能融合在了一起,能够让批量异步任务的管理更简单。 CompletionService
能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster
这样的需求。- 线程池隔离。
CompletionService
支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
三、源码分析
3.1 构造方法
CompletionService
只是一个接口,它定义了一套提交任务和获取结果的方法,而它唯一的实现类ExecutorCompletionService
来实现了这些方法,该类构造如下:
public class ExecutorCompletionService<V> implements CompletionService<V> {
// 执行任务的线程池
private final Executor executor;
private final AbstractExecutorService aes;
// 任务完成会记录在该队列中
private final BlockingQueue<Future<V>> completionQueue;
}
ExecutorCompletionService
提供了两个构造方法,其中线程池是一定要有的参数,任务完成的记录队列默认使用的是LinkedBlockingQueue
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
3.2 任务执行与结果记录
提供了两个任务提交的方法,可以传Callable
和Runnable
类型的参数,这内部都会将其转换为RunnableFutuer
实例,然后再封装成QueueingFuture
实例作为任务来执行
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
3.2.1 封装RunnableFuture
不论aes
是否为null,它们最终调用的都是FutureTask
的构造方法
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
在前面的已经看过FutureTask
的结构,它的任务是使用Callable
实例表示的,所以对于Runnable
类型的任务,会将其封装成一个Callable
类型任务
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 线程池静态方法,将runnable封装成一个callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 构造一个适配对象
return new RunnableAdapter<T>(task, result);
}
// 继承了Callble,内部存储Runnablle,调用call方法时,内部调用runnable的run方法
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
3.2.2 任务提交
QueueingFuture
是ExecutorCompletionService
的内部类,其实现了FutureTask
接口,当任务执行时,会去调用FutureTask
的run()
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
下面是FutureTask
的run()
public void run() {
// 任务已经被执行,直接退出
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 记录异常
setException(ex);
}
// 任务执行成功,则记录返回结果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在任务执行成功,记录返回记录结果的时候,会调用finishCompletion()
去唤醒所有阻塞的线程并调用done()
方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 唤醒后面所有等待的线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
// 任务执行完了,将其置为null
callable = null; // to reduce footprint
}
而QueueingFuture
内部类就实现了done()
方法,它将执行完的FutureTask
放入到阻塞队列中,当调用take()
方法时就可以取到任务的执行结果,如果任务都还没有执行完,就阻塞。