javacode

java线程池解析

2020-02-26  本文已影响0人  九点半的马拉

什么是线程池

线程池实际上就是一个线程缓存集合,负责对线程进行统一分配、调优和调度。
线程是稀缺资源,它的创建与销毁是一个相对来说相对偏重且资源消耗的操作,而java线程依赖于内核线程,创建线程需要进行系统操作系统切换,为避免资源过度消耗需要重用线程执行多个线程。

线程池的好处

使用的场景
单个任务处理时间不较短;需要处理的任务数量很大。

线程池状态

线程池有五种状态,RunningShutdownstoptidyingTerminated

线程池的创建

线程池主要由ThreadPoolExecutor类来创建。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)      

构造函数参数介绍:
corePoolSize: 线程池核心线程最大值
maximumPoolSize: 线程池最多线程的大小(核心线程与非核心线程)
keepAliveTime: 线程池非核心线程空闲后的存活时间
unit: 存活时间的事件单位
workQueue: 存放任务的阻塞队列
threadFactory: 用于设置创建线程的工厂,可以给创建的线程起名字,可方便查看
handler: 当任务队列和线程池都满了,执行该饱和拒绝策略。默认的策略是AbortPolicy()
其他变量:
ctl可以通过CAS实现无锁并发,效率比较高,这个变量有两个作用

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
private static int ctlOf(int rs, int wc) { return rs | wc; }

从上面可以看到ctl的最高三位表示线程池的状态,第29位表示线程池中现有的线程数,用最少的变量来减少锁竞争,提高并发效率。

饱和拒绝策略

存在四种策略

任务执行

整体流程

  1. 提交一个任务,线程池里存活的核心数小于线程出corePoolSize时,线程池会创建一个核心线程去处理提交的任务。
  2. 如果线程出核心数已经满了,即线程数等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue中排队等待执行
  3. 如果任务队列也存放满了,判断线程数是否大于maximumPoolSize,即最大线程数是否已满,如果没达到,创建一个非核心线程执行提交的任务
  4. 如果当前的线程数已经到了maximumPoolSize,如果还有新的任务请求过来,直接采用饱和拒绝策略来处理。

线程池执行的入口是在execute()方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       //获取当前的ctl
        int c = ctl.get();
        //获取当前的线程数,如果小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //添加一个核心线程,core=ture表示是创建核心线程
            if (addWorker(command, true))
                return;
            //添加任务失败,刷新ctl(在并发场景下,线程状态和线程数可能发生变化)
            c = ctl.get();
        }
        //线程池处于运行状态,并且当前任务放入到任务队列中
        if (isRunning(c) && workQueue.offer(command)) {
            //双重验证  
            //AtomicInteger里面的value是volatile类型,保持可见性
            int recheck = ctl.get();
            //线程池不再是运行状态,并且任务已经添加到队列中了,尝试使用remove             移除,如果移除成功,如果该任务进行开始执行了,就拒绝该任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果线程数为0,创建一个非核心空线程,会自动从队列中获取任务来执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果工作线程到了核心线程数,并且任务队列已经满了,就创建非核心线程进行         处理
        else if (!addWorker(command, false))
            //达到最大线程数,执行饱和拒绝策略
            reject(command);
    }

从上面看出要执行addWorker方法来创建线程

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        //相当于goto
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池状态时shutdown或之后的状态,绝大多数都是拒绝新的任务加入,但只有一种情况可以创建新线程
           // 那就是任务队列中还有未处理完的任务,而且不接受新任务(firstTask == null) ,此时添加新线程是为了加快等待队列中的任务,尽快让线程池关闭
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //core为true时表明是尝试创建核心线程,则用当前线程数与核心数比较,超过了则不能创建,为false时,则比较最大线程数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过CAS自旋,增加线程数+1,成功后跳出双层循环,执行后面的代码
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 检测当前线程池的状态,如果发生了变化,则跳到外循环,重新执行
                c = ctl.get();  
                if (runStateOf(c) != rs)
                    continue retry;
                //CAS添加失败,因为线程数发生了变化,重新进行内循环
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //执行到该步骤,说明线程数已经成功加1了
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建新的线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //只有当线程池是运行状态或者是shutdown状态且不是放入新任                     务,为了处理任务队列中剩余的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //线程只能start一次
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //workers是一个HashSet,存储所有的worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //该线程启动
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

从上面看到最后重要一步是线程的启动,这时就要用到Woker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        //线程初始化时的第一个任务,可能为null,这时要从任务队列中获取
        Runnable firstTask;
        //该线程已经处理的任务数
        volatile long completedTasks;
        Worker(Runnable firstTask) {
            //在runWoker执行前不允许被中断
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
        // 0 表示处于未锁的状态, 1 表示加锁状态
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //获取锁,不可重入
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        //释放锁
        public void unlock()      { release(1); }
        //判断锁是否被独占
        public boolean isLocked() { return isHeldExclusively(); }
       //在shutdownNow()会调用该方法,当state大于等于0时才会被中断
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

在线程运行时,会调用runWorker()方法启动线程
新创建一个worker,然后执行,如果worker中不包含要处理的任务,那么就从任务队列中获取,一旦一个woker启动了,如果任务队列不为空,则不会退出,一直循环

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出需要执行的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        //允许被中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //当任务为空时到任务队列中取
            while (task != null || (task = getTask()) != null) {
               //这个lock是为了如果线程中断,拿到锁后会报中断异常,而退出循环
                w.lock();
                // 如果线程池状态大于等于Stop并且该线程未中断,则执行中断方法
                // 或者执行Thread.interrupted()方法来判断本线程是否中断并且清除中断状态
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

从任务队列中获取需要调用runWorker方法

private Runnable getTask() {
        //表示获取任务是否已超时
        boolean timedOut = false; 
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 当线程池装池状态大于等于STOP时,不能处理任何任务,正在处理的任务也要停止, 或者当线程池状态为shutdown并且任务队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
               //将线程数减1
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 是否启用超时机制,当运行核心线程超时或者当前线程数超过核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
           //如果线程数超过线程池所允许的最大线程数获取启动超时机制下超时
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
              //如果启用超时机制就执行poll()方法,在规定时间内每获取到就返回null
             //其他情况执行take()方法,队列没有就一直阻塞到有任务
              Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //因为超时没有获取到任务
                timedOut = true;
            } catch (InterruptedException retry) {
                //在执行take()方法中被中断不算超时
                timedOut = false;
            }
        }
    }

这里有个疑问,是怎么分别核心线程与非核心线程的?非核心线程在空闲一段时间后会关闭,是怎么实现的?
实际上核心线程和非核心线程都是一样的,并没有利用标记等措施来区分,在创建新线程时有一个变量core,当ture时,说明要创建核心线程,则用当前线程数与核心数比较,超过了则不能创建,需要将任务存放到任务队列中;为false时,则用当前线程数与最大线程数比较,大于时不能创建线程,执行饱和拒绝策略。一个线程会不断循环获取任务,如果在规定时间内获取不到任务,并且线程数大于核心线程数,就关闭该线程,知道等于核心线程数,核心线程一般是不能超时关闭的,除非开启了超时机制。
什么是不可中断特性?
一个线程获得锁后,另一个线程想要获得锁,必须处于阻塞或等待状态,如果第一个线程释放锁,第二个线程会一直阻塞或等待,不可被中断

几个设定好的线程池

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

从上面可以看出线程池的核心线程数和最大线程数都是nThreads,即没有非核心线程,而它的任务队列是一个LinkedBlockingQueue,它的容量限制是Integer.MAX_VALUE,核心线程的keepAlive,不会有超时机制,这个方法使用于能估算出需要多少核心线程数量的场景
newSingleThreadExecutor
有且只有一个线程在工作,适合任务顺序执行,缺点是不能充分利用CPU多核性能

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newCachedThreadPool
核心线程数是0,最大线程数是Integer.MAX_VALUE,线程keepAlive时间是60s,它比较适合处理执行时间比较小的任务,用的队列是SynchronousQueue,这个阻塞队列没有存储空间,这意味着只要有新任务,就必须找到一个工作线程,如果没有空闲的线程,就再创建一条新的线程

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

ScheduledExecutorService
它是用来处理延时任务或定时任务
它接收SchduleFutureTask类型的任务,它采用DelayQueue存储等待的任务,DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序,它也是一个无界队列
线程会从DelayQueue取已经到期的任务去执行,执行结束后重新设置任务的到期日期,再次放回DelayQueue

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

题外话 创建线程的几种方式

  1. 继承Thread类,是Runnable接口的实现,重写run()方法
  2. 实现Runnable接口,并重写里面的run方法
  3. 利用线程池创建
  4. 实现Callable接口通过FutureTask来创建线程
    创建Callable接口的实现类,并实现了call()方法,然后创建该实现类的实例
    使用FutureTask类来包装Callbale对象,调用get()方法来获取子线程执行结束后返回的结果
    调用get方法会阻塞,必须等到自宣称结束后才会得到返回值
上一篇下一篇

猜你喜欢

热点阅读