Java语言之三 - 线程池和Future

2018-12-01  本文已影响0人  Wu杰语

读完源码,发现JAVA的线程池和Future好像是耦合在一起的,通过阅读和查找资料,发现这是和JAVA的发展有关的。分为三个阶段:


image.png

从源码来看, 按照上述图片,分为了三代实现,实现之间的界限还是非常明显的。

第一代ThreadPoolExecutor和ScheduleThreadPoolExcutor

框架原理

image.png

框架中有两个队列作为主要数据结构一个是BlockedQueue,另外一个set
* BlockingQueue用于存储任务事件,BlockingQueue是一个比较重要的结构,用于消息队列
* WorkerThread是一个封装起来的Worker作为线程,封装了丰富的控制动作

如下构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)

阿里的编程规范中,要求不允许使用Excutors这个辅助类,而必须自己使用构造函数来new一个线程池,必须自己来掌握线程池的各种参数配置来配置合适的线程池。

参数中注意三个关键的参数和上述提到的数据结构相关:

掌握这些参数,根据不同的场景配置不同的参数。例如说大数据量计算的场景(可以使用ForkJoinPoolExcutor),该配置多少线程呢?配置线程太多,会导致空转,锁资源等浪费,配置太少,会导致算力不足。这个时候一般情况下配置和CPU数量相等,每个CPU都参加运算,配置太多是没有必要的。同理,如果是处理文件,有较多的IO,如果配置线程太少,就可能会有大量任务得不到线程处理,所以这时一般经验配置2*Count(CPU)。

ScheduleThreadPoolExcutor

这个Excutor和ThreadPoolExcutor的不同:

  1. 消息队列使用的是DelayBlockingQueue,这个队列实现了定时功能,每个任务都有一个时间值,只有该时间值到达的时候,该任务才能被取出。
  2. ScheduleFutureTask,任务调度是,塞进去的是这种FutureTask,这里FutureTask执行后会根据定时间隔,在DelayBlockingQueue中塞入一个新的任务。

Future

Future在线程池中有较多的应用,在ThreadPoolExecutor中submit接口,提交一个任务,讲返回一个Future,用FutrureTask包装这个任务。
FutureTask是个微缩版的AbstractQueueSynchrogazer,实现了线程异步功能,当调用get的时候,主线程将会阻塞,作为一个节点加入到FutureTask的waiter队列中,只有FutureTask完成的时候,会讲waiter队列的线程unpark,此时主线程又重新运行。

Future上述三个迭代的技术中都有应用,但是原理上就是上述所述。

第二代ForkJoinPool

forkjoinpool是个单机版的mapreduce的实现,实现上还是按照ThreadPoolExcutor实现的,但是实现细节有很大的不同。

ForkJoinPool中的核心线程是 ForkJoinWorkerThread,默认会有8个这样的核心线程,不同之处在于,每个线程都会有一个任务队列,这个队列是个双端的栈。

ForkJoin的主要函数式computation,实现computation,在computation中fork和join,即可实现计算。fork是增加一个新的任务到队列中

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

join的代码如下:

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this) :
            externalAwaitDone();
    }

Join的代码逻辑是先讲fork出来的新任务执行,执行完成后再wait,由于是嵌套的,执行新任务的时候,又会fork出新的任务,在join,这是forkjoin难以理解的关键。

第三代CompletableFuture

CompletableFuture是为了解决地狱递归问题,也是函数式的应用。从源码中看到,CompletableFuture中有两个关键数据结构,

在CompletableFuture的函数,有两类应用:

其中有几种函子:

有几种组合逻辑:

函子和组合逻辑是不是特别眼熟,没错,这就是函数式编程的东西。

小结

按照这三代来理解线程池框架,对线程池就会非常清晰。不会产生ForkJoinPool为何和ThreadPoolExecutor格格不入的困惑。同时理解了AQS的实现,就会明白这三代的实现也是在AQS基础上的一脉相承的实现,就会感觉比较通透。

上一篇 下一篇

猜你喜欢

热点阅读