Java 学习

Java8并发编程(使用CompletableFuture)

2018-03-28  本文已影响609人  周一不上班

java8异步处理

非阻塞IO/异步/并行

非阻塞指线程处理异步任务时,当异步任务获取到数据时使用回调函数处理数据,而不是CPU空闲等待数据返回后再处理。

使用场景

Restful服务往往部署在不同的物理机器上,通过Http协议进行调用,如果直接在主线程中进行IO操作,主线程将阻塞以等待请求完成。而Java原生多线程编程繁琐而且容易出错,线程池也无法完美解决主线程阻塞的问题,Future以同步代码风格编写异步调用,提供主线程无需阻塞的方法,对于微服务架构是一个很好的并发解决方案。

在实际的web项目中,传统的服务器后端,往往使用线程池做请求处理,每一个请求到来独占一个线程,当涉及io操作时,线程阻塞以等待IO完成,当IO耗时太长或者IO操作太频繁时,线程长时间无法释放,导致线程池可用线程不足,后续的请求被迫排队,服务器吞吐量受到严重影响。

另一个场景不太常见,一段代码中包含多个计算密集型任务(如超大矩阵计算)时,在同一个线程中线性执行使得整段代码运行时间过长,这时使用异步执行可以适当加速可并行执行的代码。

scala事件驱动

Scala的Future和Promise使用 ExecutionContext 管理异步计算任务,宏观上控制并行的粒度。你可以把它看成是一个线程池,它给任务分配新的线程或者在本线程下执行,虽然不推荐这种降级为线性执行的方式。

ExecutionContextForkJoinPool 实现线程管理,有特殊需求时需要特别配置 ForkJoinPool 的最大线程数。最大线程数设置一般参照CPU核数。虽然线程很轻量,但是当CPU线程数远小于活跃的线程数量时,线程切换频繁发生时的开销还是挺可观的。这时候并行粒度太小,并行太多反而会降低性能。

Future有两种状态

完成状态有两种方式

Java CompletableFuture Demo

github项目代码

使用前提:java8

Java8吸收了Scala的Future和其他基于事件驱动的异步调用框架,比如Netty的ChannelFuture等的优点,在原先的Future接口之上实现了CompletableFuture。

CompletableFuture 通过回调函数,实现非阻塞的IO的异步调用。

使用场景:Restful服务往往部署在不同的物理机器上,通过Http协议进行调用,如果直接在主线程中进行IO操作,主线程将阻塞以等待请求完成。而Java原生多线程编程繁琐而且容易出错,线程池也无法完美解决主线程阻塞的问题,Future以同步代码风格编写异步调用,提供主线程无需阻塞的方法,对于微服务架构是一个很好的并发解决方案。

全部Demo代码

Main.java

import java.io.IOException;

importjava.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args throws Exception {
        HttpTask task = new HttpTask();

        System.out.println("main threadstarts in thread id: "+Thread.currentThread().getId());

        CompletableFuturefutureNonBlocking =CompletableFuture.supplyAsync(()-> {
            try {
                return task.doHtt("https://guazi.com");
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedExceptione) {
                e.printStackTrace();
            }
            return "error";
        });
        CompletableFuture futureBlocking= CompletableFuture.supplyAsync(( -> {
            try {
                return task.doHtt("https://guazi.com");
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedExceptione) {
                e.printStackTrace();
            }
            return "error";
        });

        // 非阻塞
        System.out.printl("-----------------非阻塞位1----------------------\n");
        // future.thenAcceptAsync() 方法阻塞本线程,http请求成功后执行回调函数
        futureNonBlocking.thenAcceptAsyn(result -> System.out.printl("\nfrom non blocking future:\n+result+"\n"));
        System.out.printl("-----------------非阻塞位2----------------------\n");

        // 阻塞
        System.out.println("\nfromblocking future:\n+futureBlocking.get()+"\n");
        // future.get() 方法阻塞本线程,直http请求成功
        System.out.printl("-----------------阻塞位1----------------------\n");
        System.out.println("main threadends");
    }
}

HttpTask.java

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

import java.io.IOException;

public class HttpTask {
    public String doHttp(String url)throws IOException,InterruptedException {
        long threadId =Thread.currentThread().getId();
        OkHttpClient client = newOkHttpClient();
        System.out.println("http iostarts in thread id: "+ threadId);

        Request request = newRequest.Builder()
                .url(url)
                .build();

        Response response = client.newCal(request).execute();
        String result = response.headers(.toString();

        // mock delay
        for (int i=1; i<=3; ++i){
            // 阻塞本线程1秒
            Thread.sleep(1000);
            System.out.println("delayed + i + " seconds in thread id:"+ threadId);
        }

        return "******** headers inthread id "+threadId+"***********\n"+resul+"***************** headers******************";
    }
}

输出结果

终端输出

本Demo中有三个线程执行,已通过线程ID标识,非阻塞线程(这里线程id为10)异步执行,结果通过回调函数处理。线程id为11的同样异步执行,但是Future接口的get()方法使主线程(id为1)阻塞以等待结果到达。

总结

线程10 → 异步 + 非阻塞IO
线程11 → 异步 + 阻塞IO

推荐使用 异步 + 非阻塞IO 的方式,这也是 Java1.8CompletableFutureJava1.5Future 的不同之处。

可以看到:

阻塞位置1 总是在 blocking futureget()方法之后执行。

使用姿势


不关心返回结果

这种情况毋庸置疑: 异步 + 非阻塞

必须获取到返回结果程序才能继续执行

这种情况下有两种选择

  1. 异步 + 阻塞 (前文已述) 说明一点,这种方式并不是完全失去了性能上的提升,因为在另一进程在未执行完成时,本进程不是必须在空等,而是可以做自己的数据处理,两个进程并发执行,虽然有时候快的那个要等待慢的那个,但是这比非异步也是有很大的性能提升的.如果实际场景中返回结果耗时太长,比如下载批量图片,请使用方法2
  2. 使用回调函数,类似javascript对异步的处理. 这种方法可以实现 异步+非阻塞的效果,但是复杂逻辑场景中会出现回调函数嵌套层数增加的混乱. trade-offs :-) :-)

参考链接

上一篇下一篇

猜你喜欢

热点阅读