java收集Java并发多线程&线程池&ForkJoin框架JAVA_CONCURRENT

【Java并发编程】—–“J.U.C”:Executor体系结构

2016-03-08  本文已影响1032人  Peter潘的博客

前言

在以前使用线程池的时候,都是简单的调用API。对于线程池体系结构原理都没有深究,直到最近在看看Netty的线程池源码时发现其都是在JDK线程池的基础上做了进一步封装,因此个人觉得应该好好深入学习一下JDK本身的线程池,一来可以看看Doug Lea大神的设计,二来对于后续深入分析Netty的线程模型也有比较大的帮助。
本文所包含的内容主要由以上两方面(如有分析不对,请拍砖):

1.为什么使用线程池

子曰:“利用有限的资源处理无限的任务”。

众所周知,创建线程的开销相比于创建进程要小很多。然而当我们创建一个线程在执行任务的过程中,不仅要消耗CPU的时间和内存资源,同时线程其自身对应的VM栈、本地方法栈、程序计数器也同样会占据内存空间,因此线程的创建与销毁依旧还是一件“奢侈”的事情。因此,如果我们为每一个执行的任务都单独的创建一个线程来执行,可能会创建执行任务的时间要远远小于创建线程所花费时间,很显然这是一种“浪费的表现”。其次如果我们没有对线程的数据量加以限制,那么无限制的创建势必会引发堆内存溢出,从而引发OOM(OutOfMemory)。

由于上述各种原因,线程池出现了,它可以对创建的线程进行复用,从而可以减少线程和销毁线程的时间;其次,由于使用了线程池之后,线程的管理将不在由用户所处理,它可以很好控制线程的数量,从而预防OOM的发生。

2.JDK线程池的体系结构

线程池的继承体系.png

上图标红的3个接口,是构成JDK线程池体系的核心接口,了解这些核心组成接口无论对于我们在使用以及后面的源码分析都有很大帮助,因此下面我也将针对这3个接口进行详细的分析。

2.1 Executor(解耦利器)

在Executor接口中只定义了一个方法:

 /*
    该方法的目的就是用于执行提交的给定任务,而任务可能不是立刻执行。 
     底层具体执行执行的线程可能是新创建的线程、或者线程池中已经存在的线程,甚至可以是当前调用线程,这一切都取决于具体的实现。
     如果Executor无法接受提交的任务,则抛出RejectedExecutionException。
     如果提交的任务是null,则抛出NullPointerException
 */
    void execute(Runnable command);

定义Executor的目的是为了完成任务的提交与任务的执行之间的解耦,对用户屏蔽底层线程的实现与调度细节,这是一种典型命令设计模式的应用。

2.2 ExecutorService(可关闭的线程池)

ExecutorService在Executor的基础上,提供了更加丰富的功能。

(1). 线程池的关闭
在ExecutorService中,分别提供了shutdown()和shutdownNow()方法用于线程池的关闭。

    /*
      拒绝之后提交的任务
      但是在线程池的销毁前,会让之前提交的任务继续执行。
    */
    void shutdown();
    
    
    /*
      拒绝之后提交的任务,并且会停止正在等待执行的任务执行
      还将停止当前正在执行的任务
      注意:该方法返回的任务列表是正在等待执行的任务列表
    */    
     List<Runnable> shutdownNow();
     

如果线程池一旦处于Termination的状态,那么线程池中将不存在任何正在执行的任务、等待执行的任务,同时任务也不能再被提交。

(2). 提交带返回值的任务

    /*
      提交带返回值的任务(Callable),返回一个Future对象,
      通过Future可以取消正在执行的任务或者等待任务的结束
    */
    <T> Future<T> submit(Callable<T> task);
    
    /*
        提交Runnable任务执行,返回Future。
        一旦任务结束,Future的get方法会返回null,
        否则将一直处于等待状态
    */
    <T> Future<T> submit(Runnable task);
    
    
    /*
      提交Runnable任务执行,并且指定返回值。
      注意:由于Runnable任务是没有返回值的,JDK的线程池
      实现类在底层会将Runnable统一的将通过一个适配器转成
      Callable来进行执行。因此传入的参数result,就是Future
      的get方法所得到的结果。
    */
    <T> Future<T> submit(Runnable task, T result);

(3). 批量提交任务

    /*
      批量执行一组任务集合,当所有的任务全部执行完毕之后才返回。
      返回的结果为一个存放Future的List,其中每一个元素都保存
      了其对应任务的执行结果和状态。Future的isDone()方法都是返回true
      特别注意:任务的结束可能有两种情况,一是正常执行完毕,或者
      由于在执行的过程中抛出了异常而结束。
      如果抛出异常,只有在返回的Future调用get方法是才会抛出异常
      否则,线程池会将异常隐藏。
      (执行下面的TestBulkExecuteTask,默认情况下程序会正常退出)
    
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
 
    /*
     批量执行一组任务,并且显示等待任务的结束。
     当所有任务执行完毕或者超时返回。
     Future的isDone()方法都是返回true。
     如果一旦超时返回,那么未执行完的任务都会被取消。
     如果任务因为取消而结束,在调用Future的get方法时会引发CancellationException。
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /*
      执行给定的任务集合,返回任务中成功执行完成的一个任务的返回值。 
      比如有3个任务,第一个任务由于异常而结束,此时不会返回,
      而是等待其他两个任务中的某一个成功执行完成才返回。(参考TestInvokeAny)
      如果所有的任务都由于异常而结束,此时会抛出异常。
      无论是成功返回还是由于异常而返回,都不会取消正在执行的任务    
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
      
    /*
       执行给定的任务集合,并且指定在给定的时间内等待获取任务中成功执行完成的一个任务的返回值。
       一旦超时,依然还没有一个任务成功结束,则抛出TimeoutException。
       但是此时不会取消正在执行的任务。  
    
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;        

package concurrency.threadpool;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 提交批量任务到线程池执行
 */
public class TestBulkExecuteTask {

    public static void main(String[] args) throws Exception {

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        List<Future<Integer>> futures = threadPool.invokeAll(Arrays.asList(
                new Task1(), new Task2(),new Task1()));
        
        /*
         * 批量执行Callable任务,当任务由于异常而结束,线程池并不会将异常抛出
         * 只有当通过Future获取任务的返回结果时才会真正抛出异常
         */
    //      for (Future<Integer> future : futures) {
    //          System.out.println(future.get());
    //      }
        
        threadPool.shutdown();
    }

    static class Task1 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+" execute!");
            return (int) (Math.random() * 1000);
        }
    }

    static class Task2 implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            //抛出的异常会被线程池隐藏
            throw new RuntimeException("执行出错");
        }
    }
}

通过上面的例子可以看到,在提交Callable任务时,我们应该使用get方法去检测任务是否正常执行,否则即使任务由于异常而终止,我们也不得而知。

package concurrency.threadpool;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestInvokeAny {

    public static void main(String[] args) throws Exception {

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        //如果3个任务都是抛出异常的,此时线程池会抛出异常
        Integer value = threadPool.invokeAny(
                Arrays.asList(new Task1(), new Task1(),new Task2()));
        System.out.println(value);
        threadPool.shutdown();
    }

    static class Task1 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+" execute!");
            return (int) (Math.random() * 1000);
        }
    }

    static class Task2 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            // 抛出的异常会被线程池隐藏
            throw new RuntimeException("执行出错");
        }
    }
}

2.2 ScheduledExecutorService(可调度的线程池)

ScheduledExecutorService是一个可以对任务进行调度的ExecutorService,它可以对任务延迟执行或者周期性地去执行任务。ScheduledExecutorService中所定义的方法都返回一个ScheduledFuture对象,它可以判断当前任务是否已经执行完毕或者取消任务的执行。

    /*
     在给定的延迟时候后执行一个不带返回值的任务,该任务只会被执行一次。
     如果传入的时间小于等于0(参考下面的说明1,分析了JDK的线程池对传入的延迟时间的触发),那么任务会立即触发执行。
     由于传入的是Runnable接口,因此ScheduleFuture的get方法返回的结果是null。
    */
     public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    
    /*
     在给定的延迟时候后执行一个带返回值的任务,该任务同样也只会被执行一次。     
    ScheduleFuture的get方法返回的结果是将是Callable执行返回的结果。
    */
     public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    
     /*
      创建任务并且以固定频率周期性地执行。
      初始任务的开始执行是在给定的initialDelay之后;
      而后续第1个任务的开始于initialDelay+2*period,
      第2个任务开始于initialDelay+2*period,以此类推...
      如果一个任务的执行时间大于给定的执行周期,那么会导致其后面的一个任务执行时间被延后。如果发生这种情况,后面的任务会在当前任务执行完毕后立即执行,JDK的线程池不会这样延迟而新创建一个线程让后面的任务与当前任务并发执行。
      参考(TestScheduleAtFixedRate和TestscheduleAtFixedRate2)
      如果任务在执行中的由于异常而停止,那么后续的任务也都不会再执行
      (因此对于调度的任务,我们在编写程序时必须确保异常能够得到正确处理,从而避免因此异常而导致整个调度任务的终结),
      否则如果想要停止定时任务的话,只能通过ScheduledFuture来取消任务或者关闭线程池。
     */
     public ScheduledFuture<?> scheduleAtFixedRate( command,long initialDelay,long period,TimeUnit unit);


     /*
      创建任务并且以固定的延迟时间周期性地执行。    
      初始任务的开始执行是在给定的initialDelay之后;
      而后续的任务都将在上一个任务执行完毕后,
      再经过delay,然后开始执行。
      对于该方法,如果在调度的过程中发生异常,那么也将导致后续任务的终结。
      (参考TestScheduledWithFixedDelay)
     */
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);


说明1:JDK底层的实现会将传入的时间转换成纳秒来进行处理,可以查看java.util.concurrent.ScheduledThreadPoolExecutor第489行代码

    /**
     * 返回一个延迟任务的触发时间
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

说明2:上面的TimeUnit是JDk 1.5所引入的枚举,它里面定义了天、小时、分钟、秒、毫秒、微秒、纳秒这些单位以及它们之间的互相转换的方法。

在分析了ScheduledExecutorService之后,下面我们通过代码再来看看这些方法之间的差异。

package concurrency.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * 固定延迟时间地周期性调度
 */
public class TestScheduledWithFixedDelay {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1);
        System.out.println("start schedule: " + System.currentTimeMillis()
                / 1000);
        /*
         * scheduleWithFixedDelay执行说明: 第一个任务的执行是在给定的延迟时间后执行,这里的延迟时间为2秒
         * 后续的任务是在上一个任务执行完成后,经过delay后继续执行(这里的delay为3秒)
         */
        ScheduledFuture<?> future = scheduledThreadPool.scheduleWithFixedDelay(
                new Task(), 2, 3, TimeUnit.SECONDS);
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println("currentTime: "
                        + (System.currentTimeMillis() / 1000) + ","
                        + Thread.currentThread().getName() + " doSomething!");
                //休息5秒
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


TestScheduleFixDelay结果.png
package concurrency.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduleAtFixedRate {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1);
        
        System.out.println("Start schedule: "+System.currentTimeMillis()/1000);
        //初始任务执行延迟2秒,每隔4秒执行下一个任务
        scheduledThreadPool.scheduleAtFixedRate(new Task(), 2, 4,
                TimeUnit.SECONDS);
    }
    
    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                System.out.println("currentTime: " + System.currentTimeMillis() /1000
                        + "," + Thread.currentThread().getName()
                        + " doSomething!");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
}
TestScheduleFixRate.png
package concurrency.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduleAtFixedRate2 {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(10);
        System.out.println("Start schedule: "+System.currentTimeMillis() /1000);
        /*
         * 任务的调度周期为4秒,但是每个任务的执行却是6秒,此时每个任务的执行时间都会被延后
         * 其执行的时间与上一个任务之间相差为6秒
         */
        scheduledThreadPool.scheduleAtFixedRate(new Task(), 2, 4,
                TimeUnit.SECONDS);
    }
    
    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                System.out.println("currentTime: " + System.currentTimeMillis() / 1000
                        + "," + Thread.currentThread().getName()
                        + " doSomething!");
                //任务执行时间为6秒
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
}

TestScheduleFixRate2.png

好了,JDK线程池的体系结构就分析到这里,下篇内容将会分析ThreadPoolExecutor这个线程池的具体实现。

上一篇下一篇

猜你喜欢

热点阅读