并发编程(5)ThreadPoolExecutor原理解析

2017-12-16  本文已影响139人  wustor

概述

由于线程的创建跟销毁是比较消耗资源的,也是比较耗时的。可能为了程序的需要,我们会创建很多线程,所以很有必要对线程进行一个统一的管理,所以就出现了线程池。通过线程池,我们可以重复利用一些线程资源,同时可以统一管理应用内的线程,防止内存泄露。

运行机制

Executor

当我们创建一个任务之后,放进线程池之后,线程池会做如下判断

继承关系

ScheduledThreadPoolExecutor

Executor 接口定义了线程池最基本的方法,提交Runnable 任务

public interface Executor {
    void execute(Runnable command);
}

ExecutorService 扩充了提交任务的类型,并且定义了线程池关闭任务的方法。

ExecutorService

AbstractExecutorService 是抽象类,主要是对ExecutorService 的一些具体实现
ThreadPoolExecutor 是最核心的一个类,下面会具体分析其源码。
ScheduledThreadPoolExecutor则是在 在 ThreadPoolExecutor 的基础上增加了时间调度的功能

成员变量

     //32-3=29,线程数量所占位数
    private static final int COUNT_BITS = Integer.SIZE – 3;    
    //低29位表示最大线程数,2的29次幂-1
    private static final int CAPACITY = (1 << COUNT_BITS) – 1;    
    //线程池自身的状态
    
    //符号位101
    private static final int RUNNING    = -1 << COUNT_BITS;
    //高3位000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
     //高3位001
    private static final int STOP       =  1 << COUNT_BITS;
   //高3位010
    private static final int TIDYING    =  2 << COUNT_BITS;
     //高3位011
    private static final int TERMINATED =  3 << COUNT_BITS;
    //缓存队列,等待中的线程任务队列
    private final BlockingQueue<Runnable> workQueue;
    //线程池中工作的线程集合
    private final HashSet<Worker> workers = new HashSet<>();
    //最大线程数
    private int largestPoolSize;
    //完成任务的线程数量
    private long completedTaskCount;
    //创建线程池的工厂类
    private volatile ThreadFactory threadFactory;
    //线程池丢弃策略
    private volatile RejectedExecutionHandler handler;
    //在等待执行任务的线程的最大等待时间
    private volatile long keepAliveTime;
    //核心线程数
    private volatile int corePoolSize;
    //线程池最大可容纳的线程数
    private volatile int maximumPoolSize;
    //默认的线程丢弃策略
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    //int 型变量,低3位表示线程池状态,剩余的位数表示最大线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

构造方法

constructors

其实前面的三个构造方法最终都调用了最后一个构造方法,所以就来看看最后一个构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}

参数比较多,下面来解释一下

时间单位 解释
TimeUnit.DAYS
TimeUnit.HOURS 小时
TimeUnit.MINUTES 分钟
TimeUnit.SECONDS
TimeUnit.MILLISECONDS 毫秒
TimeUnit.MILLISECONDS 微妙
TimeUnit.NANOSECONDS 纳秒
BlockingQueue

阻塞队列,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。

threadFactory::创建线程池的工厂

RejectedExecutionHandler: 线程丢弃策略,常见的有如下几种

丢弃策略 解释
DiscardPolicy 丢弃任务,但是不抛出异常
CallerRunsPolicy 由调用线程处理该任务
AbortPolicy 丢弃任务并抛出RejectedExecutionException
DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

execute方法

提交Runnable任务

execute方法

    public void execute(Runnable command) {
        //为空的话,抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //判断加入当前任务后,线程数是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
        //如果小于核心线程数,则将立即执行当前任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //核心线程数满了之后,先判断线程池的状态,然后判断缓存队列是否还能进行缓存
        if (isRunning(c) && workQueue.offer(command)) {
            //如果两者都满足条件,则将线程加入缓存队列
            int recheck = ctl.get();
            //如果当前任务没有在运行已经被移除,执行线程丢弃策略
            if (!isRunning(recheck) && remove(command))
                reject(command);
             // 正在运行的线程数如果是0,则直接运行当前线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //加入当前任务失败,则执行丢弃策略
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker方法

有两个参数,一个是firstTask,表示加入的Runnable任务,一个是core,表示是否添加到核心线程。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry://定义了循环的名称,便于后面直接中断循环
        for (;;) {
            int c = ctl.get();//获取状态与数量的标志位
            int rs = runStateOf(c);//判断线程状态
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
//线程池处于关闭状态,firstTask为null,或者缓存队列为空,返回false
                return false;
                //死循环
            for (;;) {
                int wc = workerCountOf(c);//获取线程池数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //比对核心线程数与最大线程数
                    return false;
                if (compareAndIncrementWorkerCount(c))
                //添加线程成功,中断循环
                    break retry;
                c = ctl.get();  //重新获取线程状态与数量的标志位
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个心的worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              final ReentrantLock mainLock = this.mainLock;
               //给当前线程上锁
                mainLock.lock();
                try {
            //获取当前线程池状态跟线程数的标记为
              int rs = runStateOf(ctl.get());
               if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
        throw new IllegalThreadStateException();
                        //将worker添加到缓存队列中去
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                         //添加成功,改变标记位
                        workerAdded = true;
                    }
                } finally {
                //释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //添加成功之后,开启线程执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //添加失败,调用addWorkerFailed方法
               addWorkerFailed(w);
        }
        return workerStarted;
    }

submit

FutureTask

继承关系

先看一下FutureTask的继承关系


FutureTask

Runnable
很常见的接口,定义了run方法

public interface Runnable {
  public abstract void run();
}

Future
带有返回值的泛型接口

public interface Future<V> {
    boolean isCancelled();//任务是否取消
    boolean isDone();//任务是否完成
    //同步方法,任务执行的返回值
    V get() throws InterruptedException, ExecutionException;
    //timeout后获取等待结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
        }

RunnableFuture
继承自Runnable,Future的泛型返回接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

Callable
//带有返回值的Runnable额接口

public interface Callable<V> {
  V call() throws Exception;
}
构造方法

Callable构造方法

   public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;    
    }

Runnable+Result构造方法

public FutureTask(Runnable runnable, V result) {
   this.callable = Executors.callable(runnable, result);
   this.state = NEW;   
    }

可以看到不管是Callable还是Runnable构造方法,最后都是使用Callable来进行构造的,之所以这么做,是因为FutureTask需要返回值

提交任务

提交Runnable任务

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

提交callable任务+返回值


    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

提交callable任务

    public <T> Future<T> submit(Callable<T> task) {
         //首先判断是否为空
        if (task == null) throw new NullPointerException();
        //将Callable转换成Future
        RunnableFuture<T> ftask = newTaskFor(task);
        //执行execute方法
        execute(ftask);//最后依然会调用execute Runnable方法
        return ftask;
    }

不管是提交什么样task,最后都会被包装成Runnable方法来执行,还是会调用Executor的execute方法。

tryTerminate

这个方法是当线程池关闭的时候会调用

    final void tryTerminate() {
     //开启死循环
        for (;;) {
            //获取线程池状态跟数量的标志位
            int c = ctl.get();
            //判断三个条件
            //1.线程是否在运行
            //2.线程池状态小于TIDYING,TERMINATED
            //3.线程池已经关闭并且队列为空
            满足上面的任意一个条件就会直接返回,很好理解
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && !   workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { 
            // 如果线程数不为0,才有资格去终止
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                    //CAS设置状态成功,调用terminated,默认空实现
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

advanceRunState();

更改当前线程池的状态

  private void advanceRunState(int targetState) {
        for (;;) {
        //获取当前线程的状态及数量的标志位
            int c = ctl.get();
        //更改线程状态
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

shutdown

 public void shutdown() {
         //锁住线程池
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
    // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
       // 中断等待中的线程
            interruptIdleWorkers();
            onShutdown();
        } finally {
        //释放锁
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
          //传入Stop状态,其余跟shutdown保持一致
            advanceRunState(STOP);
          //中断所有线程
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

shutdownNow() 和 shutdown()的大体流程相似,差别是:

Executors

Executors是Java提供的一个线程池的帮助类,可以帮助我们快速的处理线程池。

构造线程池

由于Java的线程池的构造方法比较复杂,所以Java又提供了Executors这个辅助类,帮助我们更快速地创建ThreadPoolExecutor,可以帮助我们创建4种类型的ThreadPool


Executors_create

Callable转换

executors_change

参考资料

https://www.cnblogs.com/trust-freedom/p/6693601.html
https://zhuanlan.zhihu.com/p/27232156

上一篇下一篇

猜你喜欢

热点阅读