Java线程总结 之 ThreadPool 线程池

2020-07-27  本文已影响0人  TTLLong

线程脑图

多线程.png

ThreadPool 线程池

线程池的构造参数
1. corePoolSize:核心线程数
2. maximumPoolSize:最大线程数
3. keepAliveTime:当线程数,大于核心线程数时,多出来的线程所存活的时间,超过该时间将自动销毁
4. unit:keepAliveTime的单位(TimeUnit.SECONDS等)
5. workQueue:阻塞队列,用于存放超过核心线程数的Runnable。
6. threadFactory:创建线程的工厂类,实现了ThreadFactory接口。线程池的创建线程的逻辑,就是用了该接口的newThread方法
7. handler:拒绝策略,当超过队列的容量和最大线程数时。应当采取的策略。AbortPolicy,DiscardPolicy,DiscardOldPolicy,CallerRunsPolicy

---------------------------------------------------------------------线程池构造函数源码---------------------------------------------------------------------

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
ThreadPoolExecutor工作原理
1. 如果工作线程数,小于设定的核心线程数则创建核心线程来执行任务
2. 如果核心数达到上限,将Runnable加入创建ThreadPoolExecutor时的阻塞队列(workQueue)
3. 阻塞队列达到容量上限,创建非核心线程,执行任务
4. 工作线程数量达到最大值(maximumPoolSize),执行拒绝策略(handler)

[图片上传失败...(image-aa80be-1595847718289)]

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         
         //ctl 为 AtomicInteger,用来记录当前线程状态和线程数量
        int c = ctl.get();
        // 如果工作线程数,小于设定的核心线程数则创建核心线程来执行该任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//添加任务到核心线程 创建线程在addWorker中通过Work类执行了ThreadFactory的newThread()方法
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//能够添加任务到队列中。若队列中的元素达到队列上限,offer方法将返回false
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//如果当前线程池的状态为不可运行状态,就将刚才的任务从队列中移除,并对该任务执行拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//添加任务到非核心线程
            reject(command);//如果失败,执行设置好的拒绝策略
    }
Executor框架:
ThreadPoolExecutor:
1. newSingleThreadExecutor:创建一个单线程的线程池,只有一个线程,存放任务的队列容量为Integer.Max
2. newFixedThreadPool:创建指定线程数N的线程池,核心线程数和最大线程数都为N,即线程数限制为N,存放任务的队列容量为Integer.Max.
3. newCachedThreadPool:创建一个线程池,该线程池根据需要创建新线程,但是将在可用时重新使用先前构造的线程。最大线程数为Integer.Max。存放任务的队列为,SynchronousQueue,它是一个不存储任务的阻塞队列,如果队列中有任务,则阻塞住请求的线程,直到创建新的线程,或者空闲的线程将这个任务取走执行。该线程池设置了keepAliveTime(60秒),空闲线程超出该规定时间就会自行销毁。
ScheduledThreadPoolExecutor:
可以理解未,能够执行延时任务的一个ThreadPoolExecutor。schedule方法可以设置执行任务时的延时时间。
ForkJoinPool:
ForkJoinPool采用工作窃取算法,将一个大任务根据阈值分割成很多个子任务,最后根据场景是否要合并子任务运算结果;
线程池状态:
阿里推荐的ThreadFactory是什么:
1. ThreadFactory是一个接口,里面有newThread方法。线程池里线程的创建,最终对是调用该方法创建的。
2. 线程池默认的DefaultThreadFactory:从代码中可以看出,线程池中默认的线程工厂类,给每个线程设置了group,name,非守护线程,以及线程优先级。我们可以模仿该类,实现自己的ThreadFactory

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
shutDown和shutDownNow:
  1. shutDown:停止所有未阻塞的线程。源码interruptIdleWorkers()中的w.tryLock()。能请求到这把锁时,执行interrup()方法中断。
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    
    //Work中的 trylock方法
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
  1. shutdownNow:停止所有正在执行的任务(interrupt)。返回所有还没有执行的任务,并将还没有执行的任务,从队列中删除。
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
拒绝策略:
概念:当队列的任务满了,运行的线程数也达到了最大线程数时,此时如果再来任务的话,就会按照用户的拒绝策略来执行。
1. AbortPolicy: 拒绝任务,直接抛异常
2. DiscardPolicy: 抛弃该请求任务
3. DiscardOldPolicy: 抛弃最久的没被处理的请求任务
4. CallerRunsPolicy: 由当前线程,执行此任务
上一篇 下一篇

猜你喜欢

热点阅读