Future机制在一定程度上都无法快速地满足以上需求,CompletableFuture便应运而生了。
本片会介绍CompletableFuture的api,并用一些示例演示如何去使用。
创建一个异步任务
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
- public static CompletableFuture<Void> runAsync(Runnable runnable);
- public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
supplyAsync与runAsync的区别在于:supplyAsync有返回值,而runAsync没有返回值
带Executor参数的构造函数,则使用线程池中的线程执行异步任务(线程池可以参考说说线程池)
不带Executor参数的构造函数,则使用ForkJoinPool.commonPool()中的线程执行异步任务(Fork/Join框架可以参考谈谈并行流parallelStream)
示例:使用supplyAsync创建一个有返回值的异步任务
- public class Case1 {
- public static void main(String[] args) throws Exception {
- CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 1;
- });
- //该方法会一直阻塞
- Integer result = completableFuture.get();
- System.out.println(result);
- }
- }
异步任务的回调
- public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
- public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
whenComplete开头的方法在计算任务完成(包括正常完成与出现异常)之后会回调
而exceptionally则只会在计算任务出现异常时才会被回调
如何确定哪个线程去回调whenComplete,比较复杂,先略过。
而回调whenCompleteAsync的线程比较简单,随便拿一个空闲的线程即可,后缀是Async的方法同理。
示例:计算出现异常,使用whenComplete与exceptionally进行处理
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.function.BiConsumer;
- import java.util.function.Function;
- import java.util.stream.IntStream;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case2 {
- public static void main(String[] args) throws Exception {
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName());
- int i = 1 / 0;
- return 1;
- });
- completableFuture.whenComplete(new BiConsumer<Integer, Throwable>() {
- @Override
- public void accept(Integer integer, Throwable throwable) {
- System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName());
- if (throwable == null) {
- System.out.println("计算未出现异常,结果:" + integer);
- }
- }
- });
- completableFuture.exceptionally(new Function<Throwable, Integer>() {
- @Override
- public Integer apply(Throwable throwable) {
- //出现异常时,则返回一个默认值
- System.out.println("计算出现异常,信息:" + throwable.getMessage());
- return -1;
- }
- });
- System.out.println(completableFuture.get());
- }
- }