黑马程序员技术交流社区

标题: 【杭州校区】CompletableFuture API详解 [打印本页]

作者: 小江哥    时间: 2019-11-27 13:04
标题: 【杭州校区】CompletableFuture API详解
本帖最后由 小江哥 于 2019-11-27 13:06 编辑

CompletableFuture API详解





这篇文章介绍了CompletableFuture 类的功能和一些使用实例。在我们介绍开始之前,先来了解一下这个类的背景。在JAVA中,一个异步任务的调用可以使用Threads。然而,为了获得最佳性能,需要仔细规划业务流程中的各个步骤的编排,这对于不了解JAVA整个并发体系的人来说,非常容易出错。如果JAVA提供了一个即用的容器来连接一系列任务,并且能为任务的运行提供并发性但是却不用编写复杂的多线程代码呢?CompletableFuture就是这样一个别致的小东西。
创建CompletableFuture对象。  我们可以直接new一个对象出来,也可以使用CompletableFuture为我们提供的静态方法。
  注意:这种方法直接new出来的CompletableFuture对象是无法运行的,因为他并没有处于一个“完成”状态,也就是说你调用get()方法是会被阻塞的。
CompletableFuture futrue = new CompletableFuture();  推荐下面这种方法,使用CompletableFuture的静态方法completedFuture(U value)。直接会拿到一个“完成”状态的对象,当用get()方法拿值时,你拿到的值也就是value。
String expectedValue = "the expected value";CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);assertThat(alreadyCompleted.get(), is(expectedValue));开始运行我们的第一个Task这里有2种方法来初始化我们的异步任务——使用runAsync()或者supplyAsnync()。先上一段代码:
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {                System.out.printf("[%s] I am Cool\n", Thread.currentThread().getName());            });CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {                System.out.printf("[%s] Am Awesome\n", Thread.currentThread().getName());                return null;            });打印结果:
[ForkJoinPool.commonPool-worker-3] I am Cool[ForkJoinPool.commonPool-worker-3] Am Awesome  从上面的例子中可以看出,有2种初始化CompletableFuture对象并运行我们的异步任务的方法。使用runAsync()supplyAsync()。可以很容易的看出来两者之间的差别,supplyAsync()有返回值,这个返回值可以用于被下一个任务链结点所消费,后面我们会讲到。除此之外,上述方法还提供了重载方法,当我们传入Executor时,该Task会使用传入的Executor去执行,否则默认去执行任务的线程池就是fork-join thread pool,关于该线程池,暂不赘述。
PS:我个人觉得supplyAsync方法中传入Callable比传入Supplier更合适。它们俩都是函数式接口,但是Callable和异步任务的联系更紧,并且可以抛出非运行时异常。
构造任务链  上述方法中我们只是异步的去执行了一个任务,如果我们想拿到这个任务的执行结果,并执行后面的任务呢?或者当该任务运行抛出异常时我们想来处理这些异常时呢(后面讲)?
  CompletableFuture为我们提供了几十种方法来构造任务链,这些任务链的构造过程类似于Stream.map()方法,下面将结合实例详细讲解。
Tip:该任务有两种方式来执行。第一种是如果当前线程调用thenAccept方法时,上一个任务还没执行完成时(这并不影响任务链执行的顺序性,因为这些任务都被存放在一个容器中),这时候调用此任务的线程就是上一个执行上任务的线程(ForkJoinPool-Thread);第二种是如果当前线程调用该方法时,上一个任务执行完成,这时候调用到了thenAccept方法,那么此任务会被调用thenAccept方法的线程(Main)所调用
请看下面的例子:
while (true) {            {                CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I am Cool").thenAccept(msg ->                        System.out.printf("[%s] %s and am also Awesome\n", Thread.currentThread().getName(), msg));                try {                    cf.get();                } catch (Exception ex) {                    ex.printStackTrace(System.err);                }            }        }可以看到有多个运行结果:
[ForkJoinPool.commonPool-worker-2] I am Cool and am also Awesome[main] I am Cool and am also AwesomeCompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")/*这里的msg就是上一个异步任务的返回结果:I'm Awesome*/                .thenApply(msg -> String.format("%s and am Super COOL !!!", msg))                .thenAccept(msg -> System.out.printf("%s\n", msg));输出结果:
I'm Awesome and am Super COOL !!!对比于thenApply,thenAccept更适用于作为任务链的结尾。
ExecutorService executor = Executors.newFixedThreadPool(4);CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Stunning", executor)                .thenCombineAsync(CompletableFuture.supplyAsync(() -> "am New !!!"),                    (s1, s2) -> String.format("%s AND %s", s1, s2), executor)                .thenAcceptAsync(msg ->                 System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg), executor);输出结果:
[pool-1-thread-3] I'm Stunning AND am New !!!注意这里用的是自定义线程池。当线程池在任务执行结束之前shutdown则会抛出rejectedExecution。
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Smart")                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " & am NIMBLE !!!"))                .thenAccept(msg ->                     System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));输出结果:
I'm Smart & am NIMBLE !!!thenCompose VS thenApply
这两个方法都是接收一个参数并且返回的是CompletableFuture对象。它们的不同之处在于Function的返回值,thenCompose返回的是CompletableFuture,是一个你自己已经包装好的对象;而thenApply返回的是值,它底层会将这个值包装成对象返回给你。这就类似于Optianal中faltMap()和map()之间的区别。如果你想链接一个已经存在的返回CompletableFuture的方法,thenCompose是一个更好的选择,如下:
CompletableFuture<Integer> computeAnother(Integer i){    return CompletableFuture.supplyAsync(() -> 10 + i);}CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Nimble !!!");CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (s1, s2) -> System.out.printf("[%s] %s and %s\n", Thread.currentThread().getName(), s1, s2));输出结果:
[main] I am Fast and am Nimble !!!CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {        randomDelay();        return "I am Awesome";            });CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {        randomDelay();        return "I am Cool";            });CompletableFuture cf3 = cf1.acceptEither(cf2, msg ->System.out.printf("[%s] %s and am NIMBLE !!!\n", Thread.currentThread().getName(), msg));可能的输出结果:
[ForkJoinPool.commonPool-worker-9] I am Awesome and am NIMBLE !!!或者[ForkJoinPool.commonPool-worker-2] I am Cool and am NIMBLE !!!            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {                randomDelay();                return "I am Awesome";            });            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {                randomDelay();                return "I am Bold";            });            CompletableFuture cf3 = cf1.applyToEither(cf2, msg -> String.format("%s and am Cool !!!", msg))                    .thenAccept(msg -> System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));可能出现以下结果
[ForkJoinPool.commonPool-worker-9] I am Awesome and am Cool !!!或者[ForkJoinPool.commonPool-worker-2] I am Bold and am Cool !!!值得注意的是:以上两种"Either"方法在执行完成时,另一个还没执行完的任务会终止执行。
还有2个静态方法和Either,Both类方法大同小异下面来考一考大家,请看下面一道代码题,输出该任务链的最终值:
        Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());        CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");        CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");        CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);        CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);        //simulatedTask第一个参数为执行时间,第二个参数为返回值。        CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));        CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);        CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));        CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);        CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);        CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);答案会放在文末。
至此,我们讲述了绝大大部分构造任务链的方法,这些方法能让我们不断地向后传递不同的返回值,并且保证了任务链的顺序性。
任务链完成时的回调和异常处理在讲本节内容之前我们先来看2个方法。
我们知道上面两种方法在调用时都会阻塞当前线程,如果想继续向下运行,则必须中断或取消任务,但是如果我们想正常的结束任务,或者在拿值时如果值不存在,则抛出自定义的异常或返回默认值呢?CompletableFuture中为我们提供以下解决方法。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {                /*循环*/                infiniteLoop();                return "I am Awesome";            });System.out.println("complete:"+cf1.complete("Default"));System.out.println("isDone:"+cf1.isDone());System.out.println("result:"+cf1.join());输出结果:
complete:trueisDone:trueresult:Default            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {                /*循环*/                infiniteLoop();                return "I am Awesome";            });            System.out.println("complete:"+cf1.completeExceptionally(new RuntimeException("completeExceptionally")));            System.out.println("isDone:"+cf1.isDone());            System.out.println("isCompletedExceptionally:"+cf1.isCompletedExceptionally());            System.out.println("result:"+cf1.join());输出结果:
complete:trueisDone:trueException in thread "main" isCompletedExceptionally:truejava.util.concurrent.CompletionException: java.lang.RuntimeException: completeExceptionally    at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)    at CompletableFutureSample.main(CompletableFutureSample.java:27)Caused by: java.lang.RuntimeException: completeExceptionally    at CompletableFutureSample.main(CompletableFutureSample.java:24)上述方法在构建我们不想等待太多时间的健壮系统时很有用。
            CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {                return user.getName();            });            cf1.exceptionally(ex -> ex.getMessage())            /*如果抛出了异常,这里传递给下一个结点的值是ex.getMessage()*/            .thenAccept(System.out::println);            /*如果使用get或join拿里面的值的话,如果任务有异常,会抛出CompletionException异常的*/            System.out.println("isDone:" + cf1.isDone());            System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());输出结果:
/*user为空时的打印结果*/java.lang.NullPointerExceptionisDone:trueisCompletedExceptionally:true/*正常的打印结果*/JackisDone:trueisCompletedExceptionally:false CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {                return user.getName();            }).handle((v,ex)->{                if(v==null)                    return "user is null";                return v;            })            .thenAccept(System.out::println);            /*如果使用get或join拿里面的值的话,如果任务有异常,会抛出CompletionException异常的*/            System.out.println("isDone:" + cf1.isDone());            /*使用handle时,无论出不出异常,该值都为false。*/            System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());/*user为空时的打印结果*/user is null!isDone:trueisCompletedExceptionally:false/*正常的打印结果*/JackisDone:trueisCompletedExceptionally:falseCompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {                return user.getName();            }).handle((v,ex)->{                if(v==null)                    System.out.println("user is null");                else                    System.out.println(v);            })            .thenAccept(System.out::println);输出结果和上例相同,为了简洁就不展示了。
Async MethodsCompletableFuture API为绝大部分的方法提供了2个额外的方法变体,它们后缀名都是“Async”。这些异步方法都会使用线程池来执行,如果传入的线程池为null,则还会使用默认的fork/join pool来执行任务,这可以更有效率地提高你任务的并发性。
总结  这篇文章我们总结了CompletableFuture API的功能,从创建对象,到构造任务链,最后再到异常的处理。
答案
THE QUICK BROWN FOX JUMPED OVER the lazy dog



转自作者:梅肯羅斯
链接:https://www.jianshu.com/p/558b090ae4bb









欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2