CompletableFuture是Java 8提供了新特性,实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力。可以帮助我们简化异步编程的复杂性,让我们在Java在处理多任务的协同工作时更加顺畅便利。以下是记录一次使用CompletableFuture优化业务代码的技巧。
- 需求场景:
业务有一接口获取数据,数据需要在一个for loop中查询的数据,每个查询都是独立的互不影响,而且不能使用批量查询。当有一个查询抛出正常的业务异常时,直接把异常抛出无需继续往下查询。如果每一查询成功,则返回一个结果list。
- 方案梳理
- for loop下逐个查询,时间复杂度时O(n);
- for loop增加线程池处理,理想的时间复杂度是O(1);
明显使用线程池可以提升查询响应时间,值得注意是这里是要求有返回结果的。我们可以使用future.get()
来阻塞获取线程池的结果包括异常。
但是,仅仅通过future.get()
来获取结果就很容踩到一些坑的:
future.get()
获取的异常经过包裹,很难区分出业务正常抛出的异常;- for loop下
future.get()
,需要阻塞等等所有线程执行完毕,没有很好满足单一线程抛出业务异常就返回结果。
下面就使用CompletableFuture来优化for loop下线程池处理业务以及异常的处理方法。
-
方法前准备
线程池executor、resultList(并发下注意使用安全容器)、futureList
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-pool-%d").get();
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
// private static final List<Integer> resultList = new ArrayList<>();
private static final List<Integer> resultList = Collections.synchronizedList(new ArrayList<>());
private static final List<CompletableFuture<Integer>> futureList = new ArrayList<>();
- 测试方法
记录方法执行时间和异常发生的时间
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
doSomething(executor, resultList, futureList);
} catch (Exception e) {
System.out.println("main 处理了异常 -> " + e.getMessage());
}
System.out.println("方法执行时间: " + (System.currentTimeMillis() - start));
System.out.println("main 执行完了...");
executor.shutdown();
}
- 方法1
private static void doSomething(ThreadPoolExecutor executor, List<Integer> resultList, List<CompletableFuture<Integer>> futureList) {
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(finalI * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (finalI == 4) {
System.out.println("业务异常... -> " + finalI);
throw new RuntimeException("业务异常... -> " + finalI);
}
return finalI;
}, executor).handle((result, e) -> {
if (e != null) {
System.out.println("CompletableFuture处理异常 -> " + e.getMessage());
throw new RuntimeException("CompletableFuture处理异常 -> " + e.getMessage());
}
resultList.add(result);
return result;
});
futureList.add(future);
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
System.out.println(resultList.size());
System.out.println(resultList);
}
测试结果
业务异常... -> 4
CompletableFuture处理异常 -> java.lang.RuntimeException: 业务异常... -> 4
main 处理了异常 -> java.lang.RuntimeException: CompletableFuture处理异常 -> java.lang.RuntimeException: 业务异常... -> 4
方法执行时间: 9081
main 执行完了...
方法执行时间是9秒,说明即使某一个线程发生了异常, 也要等待所有的线程执行完才返回。
- 方法2
private static void doSomething2(ThreadPoolExecutor executor, List<Integer> resultList, List<CompletableFuture<Integer>> futureList) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(finalI * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (finalI == 4) {
System.out.println("业务异常... -> " + finalI);
throw new RuntimeException("业务异常... -> " + finalI);
}
return finalI;
}, executor).handle((result, e) -> {
if (e != null) {
sb.append("CompletableFuture处理异常 -> " + e.getMessage());
throw new RuntimeException("CompletableFuture处理异常 -> " + e.getMessage());
}
resultList.add(result);
return result;
});
futureList.add(future);
}
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
if (completableFuture.isCompletedExceptionally()) {
throw new RuntimeException(sb.toString());
} else {
resultList.addAll(futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
System.out.println(resultList);
}
测试结果
业务异常... -> 4
main 处理了异常 -> java.lang.RuntimeException: CompletableFuture处理异常 -> java.lang.RuntimeException: 业务异常... -> 4
方法执行时间: 4036
main 执行完了...
方法执行时间是4秒,某一个线程发生了异常, 直接返回, 无需等待所有的线程执行完才返回。
此方法使用注意点:
- 使用
StringBuffer
代替StringBuilder
,避免并发安全问题; - CompletableFuture处理的异常都会封装成java.util.concurrent.CompletionException抛出,会丢失原有异常类信息。
- 方法3
解决异常被封装成java.util.concurrent.CompletionException问题,提取异常信息正确使用方法:e.getCause().getMessage()
。
private static void doSomething2(ThreadPoolExecutor executor, List<Integer> resultList, List<CompletableFuture<Integer>> futureList) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(finalI * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (finalI == 4) {
System.out.println("业务异常... -> " + finalI);
throw new RuntimeException("业务异常... -> " + finalI);
}
return finalI;
}, executor).handle((result, e) -> {
if (e != null) {
sb.append("CompletableFuture处理异常 -> " + e.getCause().getMessage());
throw new RuntimeException("CompletableFuture处理异常 -> " + e.getMessage());
}
resultList.add(result);
return result;
});
futureList.add(future);
}
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
if (completableFuture.isCompletedExceptionally()) {
throw new RuntimeException(sb.toString());
} else {
resultList.addAll(futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
System.out.println(resultList);
}
相关文章
暂无评论...