Java并发(九) -- ThreadPoolExecutor源

2020-05-29  本文已影响0人  OakesYa

背景

当我们在项目中创建线程池时,可以用Executors里面封装好的newSingleThreadExecutor,newFixedThreadPool等线程池,但是阿里的java开发规范建议我们自己创建一个线程池,其实我们看Executors里面的线程池底层也依旧是使用的ThreadPoolExecutor类,为什么这么做呢,因为只有在自己new一个ThreadPoolExecutor对象过程中才会知道各个参数的意义,并选择合适的参数构建线程池。ThreadPoolExecutor的关系继承图如下


源码

ThreadPoolExecutor是一个内容很多的类,我们先看下他的完整构造函数,了解一个每个参数的含义

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

然后我们看下拒绝策略

    public static class AbortPolicy implements RejectedExecutionHandler {
      
        public AbortPolicy() { }

        /**
         * 直接抛出异常
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
      
        public CallerRunsPolicy() { }

        /**
         * 线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    public static class DiscardPolicy implements RejectedExecutionHandler {
      
        public DiscardPolicy() { }

        /**
         * 默默丢弃无法处理的任务,不予任何处理。
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
      
        public DiscardOldestPolicy() { }

        /**
         * 丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

拒绝策略现在就这四种,当然我们也可以自己去实现RejectedExecutionHandler 接口,不过jdk提供的这四种目前足够使用了

执行

我们想看下线程具体到哪里执行的,先看下源码

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. 如果小于corePoolSize数的线程正在执行,那么当前就新建一个线程执行
         *
         * 2. 如果一个任务成功排队,我们仍然需要检查我们是否需要添加一个线程,
         *     因为上次检查后有的线程已经死亡或者线程池可能已经shutdown
         *
         * 3. 如果任务无法入队,那我们应该添加一个新线程,如果仍然失败,
         *     那我们知道线程池已经shutodown或者已满拒绝添加任务
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    private final HashSet<Worker> workers = new HashSet<Worker>();
     
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面execute方法就是正式执行Runnable任务的方法,可以看到Runnable任务被封装成了worker并放入了一个HashSet中进行已创建线程的管理。

案例

public class ThreadPoolTest {
    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Thread").build();
    private static ExecutorService threadPool = new ThreadPoolExecutor(1,5,2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory,  new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        try {
            Future future1 = threadPool.submit(() -> System.out.println(Thread.currentThread().getName() + " 执行第一个"));
            Future future2 = threadPool.submit(() -> System.out.println(Thread.currentThread().getName() + " 执行第二个"));
            finishThreads(future1, future2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void finishThreads(Future... futures) throws Exception {
        for (Future future : futures) {
            future.get();
        }
    }
}

我们创建了一个线程池并执行了两个打印任务。

上一篇下一篇

猜你喜欢

热点阅读