ThreadPoolExecutor线程池源码和典型问题

2020-06-19  本文已影响0人  唯爱_0834

Executor框架

线程队列

ArrayBlockingQueue : 基于数组结构的有界阻塞队列(FIFO) 源码附录在文章最下
PriorityBlockingQueue : 基于数组结构的优先级的无限阻塞队列
LinkedBlockingQueue : 基于链表结构的阻塞队列(FIFO)
SynchronousQueue : 不存储元素的阻塞队列(FIFO先进先出公平,或LIFO非公平)

Executor的运用

  1. Executors.callable(Runnable task):将Runnable对象封装为一个Callable对象
  2. ExecutorService.execute(Runnable task):执行一个没有返回值得任务
  3. ExecutorService.submit(Runnable或Callable task ):返回一个实现Future接口的对象FutureTask,
    1. 主线程可以执行FutureTask.get()方法等待任务执行完成获取接口
    2. 也可以调用FutureTask.cancel()来取消此任务的执行
ThreadPoolExecutor源码分析
  1. 首先创建线程池最终调取方法是:
        public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    //调取这个方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
    
    • corePoolSize : 核心线程数,创建线程池后默认线程为0,当有任务来时才会创建对应的一个线程,直到达到corePoolSize大小,就会将在到达的任务放到缓存队列中,除非调用了预创建线程才会在没有任务到达之前就创建对应的线程数;
    • maximumPoolSize: 最大线程数,线程池最多能创建的线程数,当队列满了以后再进来的任务会在创建一个新的线程
    • keepAliveTime : 线程没有任务执行时最多保持多久时间会终止.默认只有线程数大于corePoolSize时才起作用,针对的是最大线程数时创建的线程.有方法让线程数在核心线程之内也起作用,直到线程数为0
    • unit : 保持时间的参数单位
    • workQueue: 一个阻塞队列,用来存储等待执行的任务,当任务满了就会调用maximumPoolSize创建新线程
    • threadFactory : 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程命名或者设置优先级等操作:通过implements ThreadFactory可以自定义线程工厂
    • handler : 便是当拒绝处理任务时的策略(当阻塞队列满了,且达到最大线程数后再来任务会调取),通过implements RejectedExecutionHandler可自定义自己的拒绝策略
      1. AbortPolicy : 默认方式,直接抛出异常
      2. CallerRunsPolicy : 只用调用者所在线程来运行任务,异步任务变成了同步执行了,比如主线程调用的execute方法则拒绝策略会将线程池拒绝的任务交给主线程执行了
      3. DiscardOldestPolicy: 丢弃队列中对头的那个任务(最早添加进来的)并执行当前任务
      4. DiscardPolicy: 不处理,丢弃掉
ThreadPoolExecutor : FixedThreadPool, SingleThreadExecutor, CachedThreadPool
FixedTHreadPool(默认阻塞队列无界==> OOM)
  1. 创建一个固定线程数量nThreads的线程,核心线程跟最大线程数量一致都为nThreads
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
  2. 使用无界队列LinkedBlockingQueue: 使得最大线程数,跟超时时间都是无意义的,根据线程池运行步骤,队列不可能被填满,所以未执行shutdown()或shutdownNow()方法的运行中的FixedThreadPool不会拒绝任务
  3. 使用与为了满足资源管理的需求而限制当前线程数量,用于负载比较重的服务器
SingleThreadExecutor(默认阻塞队列无界==> OOM)
CachedThreadPool(默认核心线程0,最大线程i.MAX,synchronousQueue==>OOM)
  1. 创建一个大小无界的线程池,适用于执行很多的短期异步人物的小程序,或者负载比较轻量的服务器
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
  2. 分析: corePoolSize 设置为0 ,核心线程数为空,而最大线程数设置为Inter.MAX_VALUE,即非核心线程为无限的,同时空闲线程等待新任务最长为60s后被终止
  3. 同时: 使用没有容量的SynchronousQueue作为线程池的工作队列,这意味着如果主线程提交任务速度高于线程池中线程处理任务的速度,CachedThreadPool将会不断创建新线程,最终耗尽CPU和内存资源
  4. 执行步骤:
    1. 执行execute方法,首先执行SynchronousQueue的offer方法提交任务,并查询线程池中是否有空闲线程来执行其中的poll方法来移除任务,如果有,则配对成功,将任务交给这个空闲队列
    2. 否则,配对失败,将会创建一个新线程去处理任务
    3. 当线程池中线程空闲时,会执行synchronousQueue的poll方法等到其提交新任务,如果超过60s依然没有提交,则这个线程就会终止
    4. 由于非核心线程无界,所以一旦提交任务速度 > 线程池处理速度就会不断的创建新线程
    5. 因此:使用与每次提交任务都会有线程立刻进行处理的,大量,耗时少的任务,长时间保持空闲的CachedThreadPool将不会使用任何资源
    6. 其实就是主线程调用 offer方法跟没有容量的阻塞队列的poll方法是否适配,如果适配就使用该空闲线程,如果不适配就从新创建线程执行任务
ThreadPoolExecutor

ScheduledThreadPoolExecutor

  1. DelayQueue: 封装了一个优先级队列,会对队列中的ScheduledFutureTask进行排序,两个任务执行time不同时,time小的先执行,否则比较添加到队列中的ScheduledFutureTask的顺序号sequenceNumber,先提交的先执行
  2. 工作机制为:
    1. 调用上面两个方法添加一个任务
    2. 线程池中的线程从DelayQueue中取任务
    3. 然后执行任务
  3. 执行步骤为:
    1. 线程从DelayQueue中获取time大于等于当前时间的ScheduledFutureTask , DelayQueue.take()
    2. 执行完后修改这个task的time为下次被执行时间
    3. 然后在把这个task放回队列中DelayQueue.add()
  4. 使用场景: 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景

作业

  1. 自定义创建一个能够根据加入顺序得到最后一个数据的线程池,前面的没有完成就抛弃
  2. 自定义设置一个先达到最大队列工作,后缓存队列的线程池
  1. 单机上一个线程正在处理任务,如果突然断电了怎么办(正在处理和阻塞队列里的请求怎么处理)
    答: 可以对正在处理和阻塞队列的任务做事务管理或对阻塞队列中的任务持久化处理,并且当断电或系统崩溃,操作无法进行时,可以通过回溯日志的方式来撤销正在处理的已经执行成功的操作,然后重新执行整个阻塞队列
  2. 为什么不建议在代码中直接使用Executors创建线程池,而是推荐通过ThreadPoolExecutor方式创建?
  3. 答:不适用是可以明确的让我们知道线程池的运行规则,避免使用工具类的包装而不够直观内部机制导致潜在难以发现的问题,比如,使用newSingleThreadPool和FixedThreadPool创建线程池由于默认最大线程为Max而导致如果处理时间过长,任务过多而导致的OOM就很难发现问题

多线程中的问题

CountDownLatch与CyclicBarrier区别
线程池的选择使用
jstack追踪异常代码
  1. JPS常用命令整理
    • JPS是1.5提供的一个显示当前所有Java进程pid命令
    1. jps : 列出pid和Java主类名
    2. jps -l : 列出pid和Java主类全名
    3. jps -lm : 列出pid,主类全程和应用程序参数
    4. jps -v : 列出pid和JVM参数
  2. Jstack常用命令整理
    • jstack是JVM自带的一种堆栈追踪工具
    • jstack pid(通过jps获得的pid) 打印线程堆栈
线程中断机制
public class InterruptedTest extends Thread{

    public static void main(String[] args) {
        InterruptedTest interruptedTest = new InterruptedTest();
        interruptedTest.start();
    }


    @Override
    public void run() {
        super.run();
        MyThread myThread = new MyThread();

        myThread.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //开始中断
        myThread.interrupt();


    }


    class MyThread extends Thread{

        @Override
        public void run() {
            super.run();
            //判断是否标记为中断:注意被异常捕获以后中断标记位会被重置为false,需要再次抛出方可中断
            while (!Thread.currentThread().isInterrupted()){

                System.out.println("我是正在运行...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) { //sleep是会有中断标记的因此会抛出interruptedException: 中断标志位会被清除,如果想让上方while退出,必须再次手动设置标志位
                    e.printStackTrace();
                    System.out.println("我收到了异常中断标记" + Thread.currentThread().isInterrupted()); // false:已经被异常重置了
                    interrupt(); //再次抛出
                    System.out.println("我收到了再次抛出中断标记" + Thread.currentThread().isInterrupted()); //true,可以正常中断
                }
            }
        }
    }
}

线程池线程复用原理
ThreadPoolExecutor
  1. 顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
  2. ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行
  3. AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可
  4. ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
线程池的生命周期管理
//使用高3位保存线程池的运行状态runState(总共5种3位足够了),低29位保存workerCount有效线程数量:使用同一位的原子类不用对两个变量操作时需要加锁操作了,直接使用一个原子类即可
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //五种线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
    private static int workerCountOf(int c)  { return c & CAPACITY; } //计算当前线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成一个ctl合成数量是一个原子类
任务调度
  private final BlockingQueue<Runnable> workQueue; //线程阻塞队列
  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //计算当前运行线程数小于核心线程数,对应上方2的情况
            if (addWorker(command, true)) //创建新的线程执行任务
                return;
            c = ctl.get();
        }
        //执行这里说明过了2那种情况,就看3的情况,阻塞队列可以正常添加任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get(); //添加成功了
            if (! isRunning(recheck) && remove(command)) //如果添加以后不再是运行状态,则移除刚才加入队列的任务
                reject(command);  //执行拒绝策略
            else if (workerCountOf(recheck) == 0) //线程池运行状态且wc运行线程数为0时
                addWorker(null, false); //创建新线程执行阻塞队列中的任务
        }
        else if (!addWorker(command, false)) //如果队列满了,可以正常走addWorker创建非核心线程即上方4, 如果false则是5
            reject(command); //走拒绝策略
    }
 private final HashSet<Worker> workers = new HashSet<>(); //线程池中的所有工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
//根据当前状态,判断是否添加成功,上方执行方法中的addWorker两个参数firstTask = null ,core = true /false 具体分析
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); //获取运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && //状态 > shutDown 表示此时已经不再接受任务
            //shutdown状态不接受新任务,但可以执行已经加入队列中的任务,所以当进入shutdown状态,且传进来的任务为null时,并且任务队列不为null时,是允许添加新线程的,把这个条件取反就是不允许
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty())) 
                return false;

            for (;;) { //使用CAS操作避免加锁
                int wc = workerCountOf(c); //获取工作线程
                if (wc >= CAPACITY || 
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false; //大于线程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允许线程量则不能添加啦
                if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作线程数+1,成功说明可添加
                    break retry; //break跳出retry对应的循环,执行循环后面的添加worker逻辑
                c = ctl.get();  // Re-read ctl 重新读取状态
                if (runStateOf(c) != rs) 
                    continue retry; //状态改变了,跳到外层循环继续重新执行循环
                // else CAS failed due to workerCount change; retry inner loop
                //在内存层循环中不停的尝试CAS操作增加线程数
            }
        }
        //找了上方break retry可以正常使用CAS新增线程数
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //通过Worker包装runnable任务,稍后我们分析
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //加锁
                try {
                
                    int rs = runStateOf(ctl.get());
                    //如果线程池状态rs < Shutdown即只能是Running
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { //或者shutDown状态但是没有新任务
                        if (t.isAlive()) // 线程已经启动,并且当前没有任何异常的话,则是true,否则为false
                            throw new IllegalThreadStateException(); //我还没有启动呢
                        workers.add(w); //正常添加到线程池中workers工作线程
                        int s = workers.size();
                        if (s > largestPoolSize) //largestPoolSize:记录着线程池中出现过最大线程数量
                            largestPoolSize = s;
                        workerAdded = true; //可以正常工作的标记
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { //如果正常工作,则开启线程任务
                    t.start();
                    workerStarted = true; //开始工作标记
                }
            }
        } finally {
            if (! workerStarted) //该任务没有开始,则添加到失败
                addWorkerFailed(w); 
        }
        return workerStarted;
    }
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable //实现了Runnable接口,因此t.start()执行的就是worker的run方法啊
         {
     
        final Thread thread;
       
        Runnable firstTask;
        
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);  //创建thread(this:Worker) ,则t.start()调用worker的run,同时原来的Runnable被封装为Worker的属性firstTask
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        
    //getThreadFactory即为ThreadPoolExecutor创建thread工厂(实现ThreadFactory)可修改Thread名称,优先级等操作实现的
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; //这个就是我们执行线程池executor.execute()方法时候的runnable
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //如果task不为null,并且从workQueue中获取任务不为null,则会一直执行下去
            while (task != null || (task = getTask()) != null) { //task是需要执行的任务,不一定是刚刚添加的那个了,这样其实worker线程并没有完成工作,自然也就不会销毁了
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || //检查线程状态,若线程池处于中断状态,调用interrupt将线程中断
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt(); //中断线程
                try {
                    beforeExecute(wt, task); //可以在任务真正执行之前做点啥,空实现
                    Throwable thrown = null;
                    try {
                        task.run(); //执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哈!
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); //线程之后可以做啥,空实现
                    }
                } finally {
                    task = null;
                    w.completedTasks++; //该线程执行完成任务+1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 对应ShutDown虽然不添加任务,但是可以执行阻塞队列中的,Stop以后就不能子在执行任务了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null; //返回null,停止执行任务
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 表示是否允许核心线程超时销毁,默认false不销毁.若设置成true,核心线程也会销毁的
            //只有正在工作的线程数大于核心线程数才会为true,佛足额返回false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
        
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //如果timed为true(wx > 核心线程),通过poll取任务,如果为false,通过take取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //这两个参数就是创建线程池中保存时间量
                    workQueue.take();
                if (r != null) //如果有任务就退出死循环,返回任务交给上方的worker线程运行
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
//ArrayBlockingQueue
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) { //是否队列中的元素个数为0,说明空队列
                if (nanos <= 0L) //等待时间到了,队列中还未有数据加入,则返回null,
                    return null;
                /**
                * 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否* * 则调用该方法时会抛出IllegalMonitorStateException。
                * nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间* * 内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
                *若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;
                * 若指定时间内未收到通知,则返回0或负数。 
                */
                nanos = notEmpty.awaitNanos(nanos);  //每次signal唤醒重新等待
            }
            return dequeue(); //如果有元素取出
        } finally {
            lock.unlock();
        }
    }
//如果poll超时返回null,则回调到
f ((wc > maximumPoolSize || (timed && timedOut)) //true
                && (wc > 1 || workQueue.isEmpty())) { //队列也是空的,走进去
                if (compareAndDecrementWorkerCount(c)) //CAS可以减少c的个数
                    return null; //返回了null,该线程不能再上方的while循环中继续获取就结束线程啦,非核心线程就over啦,嘿嘿!
                continue;
            }


public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)  //不能使用if,避免虚假唤醒
                notEmpty.await();  //一旦count队列为空,会一致await阻塞在这里的,直到workQueue.offer()添加元素时唤醒
            return dequeue(); //取出队头元素
        } finally {
            lock.unlock();
        }
    }
//这个就是调用当前核心线程已经满了,则添加到阻塞队列中,
//刚刚上方的核心线程在等待任务,添加以后肯定就调用notEmpty.signal()唤醒等待线程取任务执行啦
if (isRunning(c) && workQueue.offer(command)) 
 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock; //获取锁,跟上方加锁时同一把锁
        lock.lock();
        try {
            if (count == items.length)
                return false; //如果当前队列已满,不能再加入了false
            else {
                enqueue(e); //正常添加到队列中
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
   
//enqueue添加到数组循环队列中后调用notEmpty.signal()唤醒一个await线程取任务开始工作啦!
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

补充一下阻塞队列的源码分析

ArrayDeque: 底层使用循环数组实现双向队列

add方法
remove 删除
修改和查找

PriorityQueue: 使用堆得优先级队列

构造函数
实现原理
添加 add/offer
获得队首元素
remove和poll
remove(Object o) :删除指定元素
注意:无论是队列还是栈都是可以用链表或者数组实现的,基本上所有数据都是这两种形式
上一篇 下一篇

猜你喜欢

热点阅读