所有分类
  • 所有分类
  • 未分类

Java异步–CompletableFuture–实例

简介

说明

本文介绍CompletableFuture这个异步编程API。

JDK8新加CompletableFuture,实现了Future<T>, CompletionStage<T>两个接口。

其位置:java.util.concurrent.CompletableFuture;

Future与CompletableFuture对比

Future与CompletableFuture相比,主要缺点如下(下边这些功能CompletableFuture都有):

  1. 不支持手动完成
    1. 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
  2. 不支持回调函数
    1. 无法在获取任务之后执行额外的任务。因为Future的get方法会一直阻塞到任务完成,Future不支持回调函数,所以无法实现这个功能。
  3. 不支持链式调用
    1. 对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。
  4. 不支持多个Future合并
    1. 比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
  5. 不支持异常处理
    1. Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

与函数式接口的关系

CompletableFuture的方法名称与函数式接口有很大的关系,所以此处特地列出来。

接口定义方法说明
Runnablevoid run();参数:无。返回值:无。
Function< T, R >R apply(T t);参数:T对象。返回值:R对象。
Consumer< T >void accept(T t);参数:T对象。返回值:无
Supplier< T >T get();参数:无。返回值:T对象
BiConsumer<T, U>void accept(T t, U u);参数:T对象。返回值:boolean

创建

简介

CompletableFuture提供了如下四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync:不支持返回值。因为它接收Runnable接口,Runnable是没有返回值的。
  • supplyAsync:支持返回值。因为它接收Supplier接口,Supplier是有返回值的。
  • 若executor参数没有设置值,那么会使用ForkJoinPool.commonPool默认线程池执行任务。实际业务中我们是严谨手动创建线程的。
  • ForkJoinPool 的线程数默认是 CPU 的核心数。但是,不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

实例(无Executor) 

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
future.get();

执行结果:

runAsync

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync");
    return "supplyAsync的返回值";
});
System.out.println(future.get());

执行结果:

supplyAsync
supplyAsync的返回值

CompletableFuture.runAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAsync");
});

String result = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
    return "supplyAsync的返回值";
}).get();

System.out.println(result);

执行结果

ForkJoinPool.commonPool-worker-9 执行异步任务 runAsync
ForkJoinPool.commonPool-worker-9 执行异步任务 supplyAsync
supplyAsync的返回值

实例(有Executor) 

package org.example.a;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAsync");
        }, executor);

        String result = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
            return "supplyAsync的返回值";
        }, executor).get();

        System.out.println(result);
    }
}

执行结果

pool-1-thread-1 执行异步任务 runAsync
pool-1-thread-2 执行异步任务 supplyAsync
supplyAsync的返回值

Future功能

CompletableFuture实现了Future接口,所以它有Future的所有功能。

package org.example.a;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Demo {
    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        CompletableFuture<String> completableFuture = new CompletableFuture<String>();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("子线程执行");
                    completableFuture.complete("success");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread t1 = new Thread(runnable);
        t1.start();

        try {
            //主线程阻塞,等待完成
            String result = completableFuture.get();
            System.out.println("主线程获得结果: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

执行结果

子线程执行
主线程获得结果: success

顺序执行

顺序执行都是thenXxx

thenXxx: 在本线程中等待任务结束,然后执行下一步。
thenXxxAsync:在异步线程中等待任务结束,然后执行下一步。 

上边这两条的具体含义见下边“thenRun系列”的两个示例。本部分“顺序执行”的其他部分都是这样的。

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
  
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

thenRun

说明

不接受参数,也不返回结果,不进不出

在本线程中等待

CompletableFuture
        .runAsync(() -> System.out.println(Thread.currentThread().getName() + " runAsync 执行"))
        .thenRun(() -> System.out.println(Thread.currentThread().getName() + " thenRun 执行"));

执行结果

ForkJoinPool.commonPool-worker-9 runAsync 执行
main thenRun 执行

在异步线程中等待

CompletableFuture
        .runAsync(() -> System.out.println(Thread.currentThread().getName() + " runAsync 执行"))
        .thenRunAsync(() -> System.out.println(Thread.currentThread().getName() + " thenRunAsync 执行"));

执行结果

ForkJoinPool.commonPool-worker-9 runAsync 执行
ForkJoinPool.commonPool-worker-9 thenRunAsync 执行

thenAccept

接受参数,但不返回结果,只进不出。

CompletableFuture
        .supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " runAsync 执行");
            return "supplyAsync返回值";
        })
        .thenAccept(i -> System.out.println(Thread.currentThread().getName() + " thenRunAsync 执行。上一步结果:" + i));

执行结果

ForkJoinPool.commonPool-worker-9 runAsync 执行
main thenRunAsync 执行。上一步结果:supplyAsync返回值

thenApply

接收参数,也输出结果。

CompletableFuture<String> completableFuture = CompletableFuture
        .supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " runAsync 执行");
            return "supplyAsync返回值";
        })
        .thenApply(i -> {
            System.out.println(Thread.currentThread().getName() + " thenRunAsync 执行。上一步结果:" + i);
            return "thenApply返回值";
        });
System.out.println(completableFuture.get());

执行结果

ForkJoinPool.commonPool-worker-9 runAsync 执行
ForkJoinPool.commonPool-worker-9 thenRunAsync 执行。上一步结果:supplyAsync返回值
thenApply返回值

thenCompose

将多个CompletableFuture组合。

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 执行");
            return "supplyAsync(第1个)返回值";
        })
        .thenCompose(value ->
                CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 执行");
                    return value + "; supplyAsync(第2个)返回值";
                })
        );

System.out.println(future.get());

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 执行
ForkJoinPool.commonPool-worker-9 supplyAsync(第2个) 执行
supplyAsync(第1个)返回值; supplyAsync(第2个)返回值

合并任务

thenCombine(合并两个/有返回值)

简介

合并两个没有依赖关系的CompletableFutures任务。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

实例

CompletableFuture<Integer>  future1= CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 执行");
    return 1;
});

CompletableFuture<Integer>  future2= CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 执行");
    return 2;
});

CompletableFuture<Integer> result=  future1.thenCombine(future2,(number1,number2)->{
    return  number1+number2;
});

System.out.println(result.get());

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 执行
ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 执行
3

allOf/anyOf(合并多个/有返回值)

上面说的是两个任务的合并,那么多个任务需要使用allOf或者anyOf方法。

allOf

会等待所有的任务执行完毕。

注意,allOf返回值为CompletableFuture<Void>型,所以无法获得返回值。

Random rand = new Random();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(rand.nextInt(1000));
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(rand.nextInt(1000));
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
CompletableFuture<Void> f = CompletableFuture.allOf(future1, future2);
f.get(); //会阻塞在这里,等待所有的任务执行完毕
System.out.println("所有任务执行完毕");

执行结果(因为是随即睡眠,所以下边结果中的第一行和第二行可能会调换顺序)

ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 睡眠结束
ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 睡眠结束
所有任务执行完毕

anyOf

等待任一任务执行完毕。

Random rand = new Random();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(rand.nextInt(1000));
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(rand.nextInt(1000));
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
CompletableFuture<Object> f =  CompletableFuture.anyOf(future1,future2);
System.out.println(f.get());

执行结果(因为是随即睡眠,所以下边结果中的第一行也可以能是第1个线程)

ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 睡眠结束
abc

thenAcceptBoth(合并两个/无返回值)

简介

两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。

位置是相对应的。比如下边的例子中,由于future1调用thenAccept,将future2作为参数。此时,无论哪个先执行完,输出结果总是:abcdef。

实例

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束");
    return "abc";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束");
    return "def";
});
future1.thenAcceptBoth(future2, (x, y) -> System.out.println(x + y)).get();

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync(第2个) 睡眠结束
ForkJoinPool.commonPool-worker-2 supplyAsync(第1个) 睡眠结束
abcdef

acceptEither(合并两个/无返回值) 

简介

谁执行返回的结果快,就用那个CompletionStage的结果进行下一步的消耗操作。

注意:所有的CompletableFuture的返回值必须是一样的。比如下边,如果future1为CompletableFuture<Integer>,future2为CompletableFuture<String>则会直接报错。

实例

Random rand = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(rand.nextInt(1000));
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(rand.nextInt(1000));
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "def";
});
future1.acceptEither(future2, x -> System.out.println(x)).get();

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 睡眠结束
abc

applyToEither(有返回值)

简介

两个CompletionStage,谁执行返回的结果快,就用那个CompletionStage的结果进行下一步的转化操作。

实例

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 执行");
    return "abc";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 执行");
    return "def";
});
String result = future1.applyToEither(future2, x -> {
    System.out.println(Thread.currentThread().getName() + " applyToEither 执行");
    return "此返回值所在的线程执行的快:" + x;
}).get();

System.out.println(Thread.currentThread().getName() + " 结果:" + result);

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 执行
ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 执行
main applyToEither 执行
main 结果:此返回值所在的线程执行的快:abc

回调/异常

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
       
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)


CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)

whenComplete(无返回值)

简介

接受2个参数,第一个参数为上一次任务的返回结果,第二个参数表示异常对象,该方法是只消费的,不会有返回结果。

whenComplete

异步子线程执行完之后,会在外部线程中执行接下来的操作。

CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync 执行");
    return "abc";
}).whenComplete((s, throwable) -> {
    System.out.println(Thread.currentThread().getName() + " whenComplete 执行");
    System.out.println("上一步的结果:" + s);
    System.out.println("上一步的异常:" + throwable.getMessage());
});

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync 执行
main whenComplete 执行
上一步的结果:abc
CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync 执行");
    int i = 1 / 0;
    return "abc";
}).whenComplete((s, throwable) -> {
    System.out.println(Thread.currentThread().getName() + " whenComplete 执行");
    System.out.println("上一步的结果:" + s);
    System.out.println("上一步的异常:" + throwable.getMessage());
});

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync 执行
ForkJoinPool.commonPool-worker-9 whenComplete 执行
上一步的结果:null
上一步的异常:java.lang.ArithmeticException: / by zero

whenCompleteAsync

异步子线程执行完之后,在这个异步子线程中执行接下来的操作。

CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync 执行");
    return "abc";
}).whenCompleteAsync((s, throwable) -> {
    System.out.println(Thread.currentThread().getName() + " whenComplete 执行");
    System.out.println("上一步的结果:" + s);
    System.out.println("上一步的异常:" + throwable.getMessage());
});

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync 执行
ForkJoinPool.commonPool-worker-9 whenComplete 执行
上一步的结果:abc

handle

和whenComplete方法对比

相同点:此方法也是接受2个参数,第一个参数为上一次任务的返回结果,第二个参数表示异常对象,

不同点:handle需要有返回结果,所以这个方法的使用场景可以是:尽管之前的执行的任务异常,仍然需要有默认返回值。

String result = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync 执行");
    int i = 1 / 0;
    return "abc";
}).handle((s, throwable) -> {
    System.out.println(Thread.currentThread().getName() + " whenComplete 执行");
    System.out.println("上一步的结果:" + s);
    if (throwable != null) {
        System.out.println("上一步的异常:" + throwable.getMessage());
        return "产生异常了。";
    }
    return "上一步的返回值:" + s;
}).get();

System.out.println(result);

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync 执行
ForkJoinPool.commonPool-worker-9 whenComplete 执行
上一步的结果:null
上一步的异常:java.lang.ArithmeticException: / by zero
产生异常了。

exceptionally

简介

此方法和handle类似,不过它只接受一个参数即异常对象,且也是需要返回一个结果。

实例

String result = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync 执行");
    int i = 1 / 0;
    return "abc";
}).exceptionally(throwable -> {
    System.out.println(Thread.currentThread().getName() + " exceptionally 执行");
    return "产生异常了";
}).get();

System.out.println(result);

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync 执行
ForkJoinPool.commonPool-worker-9 exceptionally 执行
产生异常了

线程阻塞

get()

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " supplyAsync 执行");
    return "abc";
});
String s = future.get();
System.out.println(s);

执行结果

ForkJoinPool.commonPool-worker-9 supplyAsync 执行
abc

join()

和get用法一样,都可以用来堵塞主线程,且可以获取到future的值,不同的是,当他们抛出异常时会有所区别。

1

评论0

请先

显示验证码
没有账号?注册  忘记密码?

社交账号快速登录