CompletableFuture异常优雅处理方式

2年前 (2022) 程序员胖胖胖虎阿
215 0 0

CompletableFuture是Java 8提供了新特性,实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力。可以帮助我们简化异步编程的复杂性,让我们在Java在处理多任务的协同工作时更加顺畅便利。以下是记录一次使用CompletableFuture优化业务代码的技巧。

  • 需求场景:

业务有一接口获取数据,数据需要在一个for loop中查询的数据,每个查询都是独立的互不影响,而且不能使用批量查询。当有一个查询抛出正常的业务异常时,直接把异常抛出无需继续往下查询。如果每一查询成功,则返回一个结果list。

  • 方案梳理
    • for loop下逐个查询,时间复杂度时O(n);
    • for loop增加线程池处理,理想的时间复杂度是O(1);

明显使用线程池可以提升查询响应时间,值得注意是这里是要求有返回结果的。我们可以使用future.get()来阻塞获取线程池的结果包括异常。

但是,仅仅通过future.get()来获取结果就很容踩到一些坑的:

  1. future.get()获取的异常经过包裹,很难区分出业务正常抛出的异常;
  2. for loop下future.get(),需要阻塞等等所有线程执行完毕,没有很好满足单一线程抛出业务异常就返回结果。

下面就使用CompletableFuture来优化for loop下线程池处理业务以及异常的处理方法。

  • 方法前准备

    线程池executor、resultList(并发下注意使用安全容器)、futureList

private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-pool-%d").get();
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
// private static final List<Integer> resultList = new ArrayList<>();
private static final List<Integer> resultList = Collections.synchronizedList(new ArrayList<>());
private static final List<CompletableFuture<Integer>> futureList = new ArrayList<>();
  • 测试方法

记录方法执行时间和异常发生的时间

public static void main(String[] args) {
    long start = System.currentTimeMillis();
    try {
        doSomething(executor, resultList, futureList);
    } catch (Exception e) {
        System.out.println("main 处理了异常 -> " + e.getMessage());
    }
    System.out.println("方法执行时间: " + (System.currentTimeMillis() - start));
    System.out.println("main 执行完了...");
    executor.shutdown();
}
  • 方法1
private static void doSomething(ThreadPoolExecutor executor, List<Integer> resultList, List<CompletableFuture<Integer>> futureList) {
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(finalI * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (finalI == 4) {
                System.out.println("业务异常... -> " + finalI);
                throw new RuntimeException("业务异常... -> " + finalI);
            }
            return finalI;
        }, executor).handle((result, e) -> {
            if (e != null) {
                System.out.println("CompletableFuture处理异常 -> " + e.getMessage());
                throw new RuntimeException("CompletableFuture处理异常 -> " + e.getMessage());
            }
            resultList.add(result);
            return result;
        });
        futureList.add(future);
    }

    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
    System.out.println(resultList.size());
    System.out.println(resultList);
}

测试结果

业务异常... -> 4
CompletableFuture处理异常 -> java.lang.RuntimeException: 业务异常... -> 4
main 处理了异常 -> java.lang.RuntimeException: CompletableFuture处理异常 -> java.lang.RuntimeException: 业务异常... -> 4
方法执行时间: 9081
main 执行完了...

方法执行时间是9秒,说明即使某一个线程发生了异常, 也要等待所有的线程执行完才返回。

  • 方法2
private static void doSomething2(ThreadPoolExecutor executor, List<Integer> resultList, List<CompletableFuture<Integer>> futureList) {
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(finalI * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (finalI == 4) {
                    System.out.println("业务异常... -> " + finalI);
                    throw new RuntimeException("业务异常... -> " + finalI);
                }
                return finalI;
            }, executor).handle((result, e) -> {
                if (e != null) {
                    sb.append("CompletableFuture处理异常 -> " + e.getMessage());
                    throw new RuntimeException("CompletableFuture处理异常 -> " + e.getMessage());
                }
                resultList.add(result);
                return result;
            });
            futureList.add(future);
    }

    CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
    if (completableFuture.isCompletedExceptionally()) {
        throw new RuntimeException(sb.toString());
    } else {
        resultList.addAll(futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }
    System.out.println(resultList);
}

测试结果

业务异常... -> 4
main 处理了异常 -> java.lang.RuntimeException: CompletableFuture处理异常 -> java.lang.RuntimeException: 业务异常... -> 4
方法执行时间: 4036
main 执行完了...

方法执行时间是4秒,某一个线程发生了异常, 直接返回, 无需等待所有的线程执行完才返回。

此方法使用注意点:

  1. 使用StringBuffer代替StringBuilder,避免并发安全问题;
  2. CompletableFuture处理的异常都会封装成java.util.concurrent.CompletionException抛出,会丢失原有异常类信息。
  • 方法3

解决异常被封装成java.util.concurrent.CompletionException问题,提取异常信息正确使用方法:e.getCause().getMessage()

private static void doSomething2(ThreadPoolExecutor executor, List<Integer> resultList, List<CompletableFuture<Integer>> futureList) {
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(finalI * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (finalI == 4) {
                    System.out.println("业务异常... -> " + finalI);
                    throw new RuntimeException("业务异常... -> " + finalI);
                }
                return finalI;
            }, executor).handle((result, e) -> {
                if (e != null) {
                    sb.append("CompletableFuture处理异常 -> " + e.getCause().getMessage());
                    throw new RuntimeException("CompletableFuture处理异常 -> " + e.getMessage());
                }
                resultList.add(result);
                return result;
            });
            futureList.add(future);
    }

    CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
    if (completableFuture.isCompletedExceptionally()) {
        throw new RuntimeException(sb.toString());
    } else {
        resultList.addAll(futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }
    System.out.println(resultList);
}

版权声明:程序员胖胖胖虎阿 发表于 2022年10月20日 下午4:16。
转载请注明:CompletableFuture异常优雅处理方式 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...