线程池

2020-08-19  本文已影响0人  我是许仙

线程池

提前创建好若干个线程,如果有任务需要处理,则直接分配任务给线程池中的线程,使用完线程不会销毁而是放回到线程池中。

  1. 线程的复用避免重复创建线程与销毁线程。
  2. 提高相应速度,没有了创建与销毁的操作。
  3. 颗粒的控制线程的大小,避免因为线程数超过硬件资源带来的瓶颈问题

juc的Executors提供了静态创建线程池的几种方法

 //Executor executors = Executors.newFixedThreadPool(3); //构建线程数量固定的线程池
 //Executor executor = Executors.newSingleThreadExecutor();//构建单个线程池
 //Executor executor = Executors.newCachedThreadPool(); //构建动态扩容的线程池
// Executor executor = Executors.newScheduledThreadPool();//构建有定时任务功能的线程池

提供的静态方法最终都调用了下面的构造方法

public ThreadPoolExecutor(int corePoolSize, //核心线程池 
                          int maximumPoolSize, //最大线程池
                          long keepAliveTime, //超出核心线程池的线程空闲存活的时间
                          TimeUnit unit, //keepAliveTime 的时间单位
                          BlockingQueue<Runnable> workQueue, //超出最大线程池数量,保存任务的地方
                          ThreadFactory threadFactory,//创建线程的类
                          RejectedExecutionHandler handler //当任务无法执行的时候的处理方式-拒绝策略
                         ) {
<span style='color:red'>不推荐线上环境直接使用静态方法创建线程池。</span>

因为提供的几种静态方法创建的线程池我们无法控制。

  1. newFixedThreadPool(int nThreads) 创建一个重复使用固定数量线程的线程池。这些线程用来消费共享的无边界的列队LinkedBlockingQueue 大小Integer.MAX_VALUE(2^32-1)。如果线程由于执行失败而导致关闭并不是调用的shutdown(),那么一个新的线程将会被创建替代它。<span style='color:red'>1. 虽然线程固定但是当任务太多线程消费太慢的时候会不断的往列队中插入导致内存溢出。2. 同时会导致任务处理变慢,因为当时列队容量足够大的时候,maxImumPoolSize失去了作用,相当于线程数量永远保持核心线程数,因为线程池的逻辑是线程创建核心线程然后插入列队,列队满了才会新增线程数量直到达到最大线程池数,因此当列队容量太大会导致线程池的动态扩容失去作用导致任务消费降低从而导致性能降低。</span>

  2. newSingleThreadExecutor() 创建只有一个线程的线程池。与newFixedThreadPool有同样的问题任务量太大会导致内存溢出。而且只有一个线程来执行任务并不能起到并发的效果。

  3. newCachedThreadPool 创建一个线程池,线程池中的线程随着任务的需要动态创建,最少0个多个创建2^32-1个,这种线程池能够显著的提升拥有许多短计算的异步任务的程序性能。因为线程无线创建嘛很好理解。当线程60秒内没有使用,线程将会被关闭并移除缓存。采用 SynchronousQueue来存储任务。<span style='color:red'> 当大量任务请求会导致线程无限创建,cup线程间切换开销大性能降低。</span>

  4. newScheduledThreadPool() 创建一个有调度功能的线程池。

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    

    跟newCachedThreadPool一样,最大线程池也是2的32次方-1。极限情况下一瞬间会把线程打满。

线程池流程图
image-20200814180039418.png
execute(Runable run) 执行一个任务
//获取核心线程
int c = ctl.get();
//1如果工作线程 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
    //新增一个线程
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
//2 往列队中添加一个任务 
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    //如果工作线程为0
    else if (workerCountOf(recheck) == 0)
        //添加工作线程 启动一个线程
        addWorker(null, false);
}
//3 新增任务 新增最大线程数 maximumPoolSize
else if (!addWorker(command, false))
    reject(command);
  1. 如果工作线程< 核心线程数 则新增一个线程去处理任务 相当于初始化
  2. 如果核心线程已经初始化完毕,怎添加任务到阻塞列队中
  3. 如果添加阻塞列队失败,则再次尝试新增线程。如果工作线程<maximumPoolSize 则会成功,如果失败则拒绝任务执行 线程池拒绝策略
keepAliveTime

空闲线程获取新任务等待的超时时间,默认的情况下,线程会一直阻塞相当于处于活跃状态。

有2中情况会用使用keepAliveTime属性。

  1. 工作线程数超过核心线程数的时候。
    1. setCorePoolSize() 方法
    2. maximumPoolSize > corePoolSize
  2. 如果allowCoreThreadTimeOut 属性为true的时候(在无任务的时候,会导致工作线程为0)

线程将等待keepAliveTime纳秒。不然将永远等待。其主要的核心原理是调用BlockingQueue 的2个方法

  1. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 超时等待任务
  2. workQueue.take() 阻塞直到获取任务
allowCoreThreadTimeOut 默认为false

线程池中维护了corePoolSize大小的线程,当没有任务的时候这些线程默认情况下会阻塞直到获取任务 (allowCoreThreadTimeOut = false) ,如果allowCoreThreadTimeOut = true则这些核心线程会等待keepAliveTime纳秒获取任务,如果没有任务返回则会关闭线程。一旦设置这个属性,并不会保存CorePoolSize数量的线程,当无任务会把所有的线程都关闭到0个。

  1. 坏处--当有新任务来的时候,再去new 一个线程去执行任务,这里会有增加创建线程的时间开销,失去了池的概念。
  2. 好处--停止了阻塞线程,释放了空闲线程的内存。

processWorkerExit() 工作线程退出

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) //判断是否是突然的退出
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //所有线程执行任务总数相加 
        completedTaskCount += w.completedTasks;
        //移除工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //未研究
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            //判断最小的线程
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //如果线程突然间中断了,新增一个核心线程
        addWorker(null, false);
    }
}
BlockingQueue<Runnable> workQueue 工作列队

一个阻塞列队,存储任务并移交给工作线程。keepAliveTimeallowCoreThreadTimeOutworkQueue3个属性定义了线程获取任务的行为。

Runable getTask()

//wc 可以任务是 现有的工作线程
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //自旋
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //获取工作线程数
            int wc = workerCountOf(c);

            //是否清除线程的判断逻辑,1 现有的工作线程>核心线程  2 核心线程可超时获取
            //每次循序都会做判断
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //这里是释放线程的核心逻辑,对于超过核心线程数的线程且没有科工作的任务通过cas操作  
            //当前核心线程数减少1 并且返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try { //阻塞线程获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
               //处理中断信号进行 并处理
                timedOut = false;
            }
        }
    }
ThreadPool.setCorePoolSize()

设置核心线程的数量,如果新的值小于当前的值,过量的线程当它状态变成空闲的时候被停止。如果大于,创建新线程,并开始执行任务。

源码

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    //如果运行线程大于核心线程
    if (workerCountOf(ctl.get()) > corePoolSize)
        //尝试中断空闲线程, 往下看
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

如何理解线程空闲

线程所谓的线程空闲并不是线程什么都不做。如果线程什么都不做执行完run方法就退出了,线程就会被回收。ThreadPool中的空闲是指线程被阻塞等待获取新的任务。

中断线程 interruptIdleWorkers() 只是向工作线程发送中断信号,
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //发送中断信号 给getTask()方法中被阻塞的线程推送信号,
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

作用是 唤醒阻塞获取任务的线程,看getTask()的 catch (InterruptedException retry) ** 收到信号后,再次执行循环,通过cas操作减少核心线程数的值,compareAndDecrementWorkerCount(c)**。如果成功了则直接返回null跳出循序。而失败的则会再次for循序判断 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;的值,因为核心线程数已经减少了这里判断为false 会执行线程阻塞的逻辑等待任务,从而保留了等于核心线程数的线程只删除多余的。

线程池拒绝策略
1. 什么时候会触发拒绝策略?

当最大线程全部启动且列队已经全部添满的时候,列队已经插入不进去了。会调用构造函数中传入的实现了RejectedExecutionHandler接口的方法

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
2. 有几种实现方式
  1. AbortPolicy 中断,直接抛出异常 默认的实现方式
public static class AbortPolicy implements RejectedExecutionHandler {
    
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  1. CallerRunsPolicy 直接在当前线程调用Runable方法

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
  2. DiscardOldestPolicy 丢弃老的任务,然后执行新的任务。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    
  3. DiscardPolicy 丢弃任务 不做任何处理。

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
继承关系

Executor 一个执行了提交的Runable接口的任务的对象。 void execute(Runnable command);

ExecutorService 提供了任务的终止方法,且提供了返回Future对象用获取异步执行任务的结果。

AbstractExecutorService 抽象的实现类 实现了ExecutorService的部分方法没有实现 execute(Runnable command) 方法。

ThreadPoolExecutor 提供了默认的实现的实现

<img src="http://ww1.sinaimg.cn/large/006UamUGly1ghv79tte5mj30dg0hsmy8.jpg" alt="image-20200811171515611.png" style="zoom:50%;" />

上一篇下一篇

猜你喜欢

热点阅读