java线程池(ThreadPoolExecuter)

2019-02-27  本文已影响0人  倔强韭菜

做事要有目标,看源码首先要确定需要追踪理解的问题,不然容易陷入逻辑泥潭中,理不清头绪。

因此本分结构分两部分

  1. 常见问题
  2. 源码分析,任务如何被执行?

不废话直奔主题(jdk 1.8.0 不同版本还是有不小差异)

(一)常见问题

  1. 为什么要使用线程池,有啥好处?
  2. 创建线程池时,构造函数中几个参数的作用?
  3. Executors中几种线程池的差异?

1. 为什么要使用线程池,有啥好处?

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。可以提供定时、定期、单线程、并发数控制等功能

2. 创建线程池时,构造函数中几个参数的作用?

线程池构造方法,但最终是调用这个方法。
一共七个参数。

    public ThreadPoolExecutor(int corePoolSize,      //核心线程池大小
                              int maximumPoolSize,   //线程池最大大小
                              long keepAliveTime,    //线程空闲活动时间 
                              TimeUnit unit,        //线程空闲活动时间单位
                              BlockingQueue<Runnable> workQueue,  //任务队列
                              ThreadFactory threadFactory,  //创建线程的工厂
                              RejectedExecutionHandler handler)  //拒绝策略
ArrayBlockingQueue // 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue //一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue //一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue //一个具有优先级的无限阻塞队列。
AbortPolicy:直接抛出异常。默认策略
CallerRunsPolicy:使用调用者所在线程来运行任务
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy:不处理,丢弃掉

当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。

3.Executors中几种线程池的差异?

    /**
    * corePoolSize:nThreads
    * maximumPoolSize: nThreads
    * keepAliveTime:0
    * workQueue:LinkedBlockingQueue(容量:Integer.MAX_VALUE)
    * threadFactory:默认值
    * handler:默认值 
    */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    /**
    * corePoolSize:1
    * maximumPoolSize: 1
    * keepAliveTime:0
    * workQueue:LinkedBlockingQueue(容量:Integer.MAX_VALUE)
    * threadFactory:默认值
    * handler:默认值 
    */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    /**
    * corePoolSize:0
    * maximumPoolSize: Integer.MAX_VALUE
    * keepAliveTime:60s
    * workQueue:SynchronousQueue
    * threadFactory:默认值
    * handler:默认值 
    */
    public static ExecutorSersvice newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

(二)源码分析,任务如何被执行?

线程池状态(5个)

重要接口

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

默认实现DefaultThreadFactory

   static class DefaultThreadFactory implements ThreadFactory {
        // 使用默认线程工厂的线程池数量
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        // 当前线程池中线程所属的线程组
        private final ThreadGroup group;
        // 当前线程池中累计创建的线程数量
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        // 当前线程池中线程名称前缀
        private final String namePrefix;

        DefaultThreadFactory() {
            // 声明安全管理器
            SecurityManager s = System.getSecurityManager();
            // 获取线程组
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // 线程名前缀,例如 "pool-1-thread-"                      
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
            // 设置线程t为前台线程                     0);
            if (t.isDaemon())
                t.setDaemon(false);
             // 设置线程t的优先级为NORM_PRIORITY  
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

默认实现AbortPolicy

  public static class AbortPolicy implements RejectedExecutionHandler {
  
        public AbortPolicy() { }
        
        // 简单粗暴,直接抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

重要成员变量

//任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue; 

//线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock(); 

// 线程集合,访问时必须持有 mainLock
private final HashSet<Worker> workers = new HashSet<Worker>(); 

重要方法

  1. 当线程数 < 核心线程数(corePoolSize): 不管有没有线程空闲,直接创建新线程执行任务,。
  2. 当线程数 >= 核心线程数,且任务队列未满: 将任务放入任务队列(workQueue)。
  3. 当线程数 >= 核心线程数,且任务队列已满:若线程数 < 最大线程数(maximumPoolSize),创建新线程执行任务;若线程数 = 最大线程数(maximumPoolSize),使用拒绝策略(RejectedExecutionHandler)处理任务。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        // 当前线程数小于核心线程数量时,新建线程
        // 并将command作为首个任务开始执行
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        // 如果线程池处于RUNNING状态,并且任务添加
        // 到队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //方法执行过程中,线程池可能发成改变,
            // 需要double-check
            int recheck = ctl.get();
            // 线程池状态改变,非RUNNING,需要回滚
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 原有线程可能可能已经结束     
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } 
        // 如果任务无法加入队列中,尝试新建非核心
        // 线程执行任务。如果新建失败,则使用拒绝
        // 策略处理任务
        else if (!addWorker(command, false))
            reject(command);
    }
  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 创建线程时放入的首个任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 循环, 从队列中取出等待执行的任务
           // 如果任务队列为空,超过keepAliveTime时长,
           // getTask()返回为空,此时线程就会销毁
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任务执行开始前调用,子类可重写此方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行结束后调用,子类可重写此方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 线程销毁
            processWorkerExit(w, completedAbruptly);
        }
    }
 private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 允许核心线程销毁或者线程数量大于核心线程数量
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 获取任务超时,线程数量减1,返回空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 从阻塞队列中获取任务设定超时时间
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读