1. 创建CompletableFuture
函数名 |
备注 |
supplyAsync |
参数Supplier 无参数有返回值 |
runAsync |
参数Runnable 无参数无返回值 |
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
@Test
public void test01() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("Runnable"),executors);
System.out.println(future.get());
}
}
2. 串行化方法
函数名 |
支持异步(带Async后缀) |
备注 |
thenApply |
√ |
有参数有返回值 Function |
thenAccept |
√ |
有参数无返回值 Consumer |
thenRun |
√ |
无参数无返回值 Runnable |
不带有Async表示默认使用当前线程(主线程执行)
带有Async后缀的表示可以使用其他的线程池执行异步任务,默认使用ForkJoinPool,可以进行指明自己的线程池(推荐)
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
private Consumer<Object> consumer = i -> log.info(String.valueOf(i));
@Test
public void test02() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
future.thenApply(i -> i + 1);
future.thenAccept(consumer);
future.thenRun(() -> log.info("Hello"));
future.thenAcceptAsync(consumer, executors);
}
}
3. 任务最终处理
函数名 |
支持异步(带Async后缀) |
备注 |
whenComplete |
√ |
仅处理结果返回原始CompletableFuture |
exceptionally |
× |
仅处理异常并返回默认值 |
handle |
× |
处理结果和异常并返回默认值 |
如果中间步骤出现异常,后续的链式链接不起作用,直接走到异常部分,并返回默认值
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
private Consumer<Object> consumer = i -> log.info(String.valueOf(i));
@Test
public void test03() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
future.whenComplete((i, throwable) -> {
consumer.accept(i);
consumer.accept(throwable);
});
future.thenApply(integer -> {consumer.accept(integer); return 10/ 0;})
.exceptionally(throwable -> {
consumer.accept(throwable);
return 10;
});
future.thenApply(integer -> 10 / 0)
.handle((integer, throwable) -> {
consumer.accept(integer);
consumer.accept(throwable);
return 10;
});
}
}
4. 任务组合
两个任务组合
函数名 |
支持异步(带Async后缀) |
备注 |
thenCombine |
√ |
处理两个结果并返回值 |
thenAcceptBoth |
√ |
处理两个结果无返回值 |
runAfterBoth |
√ |
无参数无返回结果 |
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
private Consumer<Object> consumer = i -> log.info(String.valueOf(i));
@Test
public void test04() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
CompletableFuture<Integer> newFuture = CompletableFuture.supplyAsync(() -> 2, executors);
future.thenCombineAsync(newFuture, (integer, integer2) -> integer + integer2, executors);
future.thenAcceptBothAsync(newFuture, (integer, integer2) -> { consumer.accept(integer + integer2);});
future.runAfterBothAsync(newFuture, () -> log.info("Runnable"));
}
}
任意一个任务完成
函数名 |
支持异步(带Async后缀) |
备注 |
applyToEither |
√ |
处理两个结果并返回值 |
acceptEither |
√ |
处理两个结果无返回值 |
runAfterEither |
√ |
无参数无返回结果 |
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
private Consumer<Object> consumer = i -> log.info(String.valueOf(i));
@Test
public void test04() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
CompletableFuture<Integer> newFuture = CompletableFuture.supplyAsync(() -> 2, executors);
future.applyToEither(newFuture, integer -> integer);
future.acceptEither(newFuture, integer -> consumer.accept(integer));
future.runAfterEither(newFuture, () -> log.info("Runnable"));
}
}
扁平化任务
函数名 |
支持异步(带Async后缀) |
备注 |
thenCompose |
× |
类似于Stream的flatMap |
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
private Consumer<Object> consumer = i -> log.info(String.valueOf(i));
@Test
public void test04() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
CompletableFuture<Integer> newFuture = CompletableFuture.supplyAsync(() -> 2, executors);
future.thenCompose(integer -> CompletableFuture.supplyAsync(() -> integer * 10).thenApply(i -> i / 10));
}
}
多任务组合
函数名 |
支持异步(带Async后缀) |
备注 |
allOf |
× |
静态方法,完成全部任务 |
anyOf |
× |
静态方法,完成任意一个任务 |
@Slf4j
public class MyTest {
private final Executor executors = Executors.newFixedThreadPool(10);
private Consumer<Object> consumer = i -> log.info(String.valueOf(i));
@Test
public void test04() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, executors);
CompletableFuture<Integer> newFuture = CompletableFuture.supplyAsync(() -> 2, executors);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future, newFuture);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future, newFuture);
}
}