美文共赏

掌握线程池7大核心参数,自己也可以手写线程池

2021-11-29  本文已影响0人  ludan110

手写线程池只需了解7个线程池核心参数

参数名 中文名 说明
corePoolSize 核心线程数 默认不会销毁,需要设置allowCoreThreadTimeOut为true时会销毁
maximumPoolSize 最大线程数量 线程数量要大于核心线程数,且不能小于等于0
keepAliveTime 空闲时间 最终存入的是nanos的值
unit 空闲时间单位 unit.tonanos(long time),最终转为纳秒值存入成员变量
workQueue 存放任务的队列 jdk实现的queue在初始化时需要设置队列长度
threadFactory 线程创建的工厂 默认会提供工厂,Executors.DefaultThreadFactory
handler 拒绝策略 默认拒绝策略为AbortPolicy,直接抛出异常

文章开头直接表格丢出七个核心参数,以及简单的说明。

如果已经是老司机,那么看到之后就能直接唤醒深处已经淡忘的记忆。

如果还是小萌新,不妨先简单的有个印象,等看完示例之后再深刻记忆。无论是对面试还是源码走读,或是线程池二开优化都会有帮助。

前导

手写线程池之前,你得知道线程池是什么,如何工作的。这样踩在Doug Lea的肩膀上你才能看的更远,走的更高。

如果你还不知道什么是线程池,可能本文不太适合你,看看我其他文章如线程池的前世今生、池化技术之后再来吧。

JDK中ThreeadPoolExecutor线程池的运行原理借用我生活中的例子来说明,帮助理解记忆。

故事

第零章

我是隶属于一个包工头(ThreadPoolExecutor)下的一个小员工,是一名Worker。曾经他手下一个员工都没有(默认线程数为0,没有线程启动),他自己也好吃懒作,从来不干活。

一次酒后,我们头儿吐露出他计划要招4个长工(coolPoolSize为4),而且以他的能力最多也只能管理6个下属(maximumPoolSize为6),那么也就说他最多也还会招2个临时工。他老婆兼职着Hr(threadFactory,理解为每一个线程都是经过她的手创建),每一个人入职都会发给我们一个安全帽,安全帽上标着编号,而我的编号是0x11。

第一章:第一个下属

有一天,一个老板遇见头儿,问他100块搬一块砖干不干,头儿说干。我们都知道的,他是不干活的。在下雨的公交站,只有我们两个人,他神秘的问我,你也是搬砖的吧,我有一个1块钱一块砖的活,你敢不敢?我想到我们老板以前只给一毛一块,我二话没说,成了他第一个员工(此时,核心线程数为1)。

第二章:干活

看着涨了10倍的工资,我的工作积极性被调动了起来。从来没有一块砖需要等我,一来砖我就搬完了,只有我等砖,没有砖等我。

第三章:两块砖

有一天,工地门口一下子来了两块砖。
我站在门口呆了,毕竟我一次只能搬一块砖,还有一块砖怎么办?工头来了,不知道他使出了什么技巧,一眨眼的时间,又拉到了一个工友,和我一起搬砖。不知道他是不是也是1块钱一块砖,我也没敢问。

第四章:两个人

活来的不是那么稳定。我和小王,对了忘记说了,那个新来的同事叫小王。小王和我一样,也是手脚麻利,干活卖力,一次来两块砖时,我们一人一块,如果只到了一块砖,我们看谁先抢到,谁就能赚这一块钱。幸好,我的手脚挺快,基本上一块砖的时候,我也能和他抢个55开。

第五章:平淡的生活

砖是一天天搬,工头的生意也越做越大,最多的时候能有一次四块砖。和小王来的那次一样,在门口岗亭那,工头和他们聊不到一支烟的功夫,我又多了两个工友。

第六章:门口的沙地

门口有一块沙地,我一直都没有注意到它。但是今天例外,因为今天一下来了五块砖,工头也没向往常一样出来解决。我知道他躺在那一定是看见了,因为我看到他故意的压了压帽檐,应该是想让我们以为他睡得比较死,帽子盖得严。

我们四个人一人搬走了一块砖,还剩下一块砖,它就静静的摆在沙地上(中间插一句,workQueue中插入了第一个任务,这毕竟还是一篇技术向的文章,别忘了正事)。它就躺在那块沙地上,我从来没有注意到的沙地,仿佛以前从未存在,但是今天它在了。沙地上还摆着我的罪恶感,不知道其他的工友是怎么想的,至少我是。我内心里从来不能忍受有砖是在等我搬的。明明只有我能等砖啊。

第七章:砖块

想聊的太多了,我好像都忘记介绍我搬的砖块了。这可不是普通的砖块,长宽高我不太有概念,只知道很重,那天工头跟我们说,这砖不耐压,最底下的砖要是再压上三块(共4块,workQueue的长度为4),如果再加一块就会把最底下的砖压坏。我甚至还听到一个比较恐怖的传说,门口那块沙地风水也不太好,要是沙地上一共压了五块砖,能把下面挖的地下室压塌,甚至整个工地也会倒掉。(队列中积压大对象过多,导致OOM)

这就是为什么我一次只能搬一块砖,这砖可不好搬呀。你也别说你来你也行。要是没有经过长期的搬砖训练,这砖你应该是搬不动的。

第八章:摇人

沙地上偶然会出现已经积压四块砖的情况,不过一般很快危险就被排除了。毕竟我们四个工友都是好样的,基本上不会让这种情况出现太久。

但是危险的事情总会出现,两辆自卸车带着第五块砖、第六块砖来了。门口也没有其他人,不管怎么样,第五块砖都会压上去吧。

然而,这件事我没有亲眼所见,是我工友告诉我的。

工友告诉我,这些自卸车到的几分钟前,工头就打了一个电话。也就等自卸车刚停稳,边上一辆小面包就过来了。面包车里跳下来两个人,一人搬起一块砖,就开始工作。

工头原来还会摇人,从没有见过,还刚好摇了两个人来搬超额的两块砖。我们私下都认为老板应该是事先知道今天肯定会来不及搬。

第九章:门房大爷

原来我不知道的事情还有这么多。门房大爷(RejectedExecutionHandler)是早在我之前,就已经和工头谈好的,帮忙看着那些砖,防止转压的过多。事情是这样的。
不是我们不勤快,是砖越来越多。即便我们六个人,也搬不过来,堆在沙地上的砖也一直到了四块。

我们一直都以为门房大爷和蔼可亲,和我们关系也老好了,不像是个社会人。但是那天自卸车到了,大家都没注意,第五块砖要压上来了,唯独大爷看到了。只见到大爷抄起菜刀脱掉安全帽带起钢盔冲到门口,大声嚷嚷挥舞菜刀赶走了自卸车和带着的砖块(默认AbortPolicy策略,直接抛异常,任务不处理)。原来大爷是这么一个大爷,有他在,这辈子是验证不了都市传说到底是不是真的了。

第十章:后疫情时代

疫情来了,送砖头的车也少了。我们知道,裁员肯定也是来了。

工头说他很人性化,他不做决断,让我们自己说了算。但是他定下规矩如果三天(时间单位TimeUnit.DAYS,时间keepAliveTime为3)一块砖都没搬,让我们自己选择离开吧。我还以为我们最开始四个不是临时工,不会被裁,原来我们都是。

工头只保证最后会留四个。

第十一章:内卷还在继续

虽然我们不拿底薪,按照搬砖的数量计薪。不过工头还是嫌我们人多活少碍事,在工地里瞎晃悠(线程还是存在进程中,多少占用了系统资源)。工头调整了规则(allowsCoreThreadTimeOuttrue,允许核心线程销毁),我们四个常任搬砖工也变成临时工,根据之前的规则要被淘汰了。
每一个人都为了一口饭,互相内卷着,希望不被开除。

手写线程池实现

终于开始自己个儿实现一个线程池了,思路就是模仿jdk中的线程池。或者说的再直白点,那就是默写jdk线程池的核心代码。

定义成员变量

/**
 * 线程池中用了一个ctl来实现, 不过自己实现的时候并不想写的这么复杂, 因此还是拆成两个变量, state与threadNum
 * 线程池状态,简单理解为-1是正常,大于0线程池已经异常, 简单起见,就没有做判断.
 */
private AtomicInteger state = new AtomicInteger(0);
/**
 * 线程数量
 */
private AtomicInteger threadNum = new AtomicInteger(0);
/**
 * 完成任务量
 */
private volatile int compliteNum = 0;

/**
 * 持有所有的线程
 */
private HashSet<Worker> workers = new HashSet<>();

/**
 * 缓存工作任务
 */
private BlockingQueue<Runnable> taskQueue;

/**
 * 线程池创建的工厂
 */
private volatile ThreadFactory threadFactory;

/**
 * 核心线程数
 */
private volatile int corePoolSize;

/**
 * 最大线程数
 */
private volatile int maximumPoolSize;

/**
 * 拒绝策略
 */
private MyRejectedExecutionHandler handler;

/**
 * 锁, 处理一些公用变量时需要加锁
 */
private final ReentrantLock mainLock = new ReentrantLock();

构造方法

只提供了一个三个参数的构造方法,其他参数都不是关键,所以直接设置了默认值。

/**
 * 提供一种构造方法, 只需要配置线程数和queue的容量
 * @param corePoolSize
 * @param maximumPoolSize
 * @param taskQueue
 */
public MyPool(Integer corePoolSize, Integer maximumPoolSize, BlockingQueue taskQueue) {
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.taskQueue = taskQueue;
    this.threadFactory = Executors.defaultThreadFactory();
    this.handler = new MyRejectedExecutionHandler();
}

核心内部类,worker

/**
 * 内部类, worker,持有线程,执行任务
 */
private class Worker implements Runnable {
    Thread thread;
    Runnable task;

    /**
     * 构造方法, 构造出work
     * @param task
     */
    public Worker( Runnable task) {
        this.task = task;
        this.thread = getThreadFactory().newThread(this);
    }

    @Override
    public void run() {
        System.out.println("覆盖run方法,对传入的任务做增强");
        runWorker(this);
    }
}

获取线程工厂的方法

线程工厂用的是jdk中自带的DefaultThreadFactory,在worker初始化时获取线程。

/**
 * 通过线程工厂创建线程
 * @return
 */
private ThreadFactory getThreadFactory() {
    return this.threadFactory;
}

自定义拒绝策略

自定义拒绝策略。

/**
 * 自定义拒绝策略, 并没有去继承jdk中的接口
 * 直接打印, 不做其他处理
 */
private class MyRejectedExecutionHandler {

    public void rejectedExecution(Runnable r, MyPool p) {
        System.out.println("任务过载,丢弃任务");
    }
}

在后续使用拒绝策略时,调用的方法:

private void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

向线程池提交任务

向线程池提交任务,在自己实现的线程池中,是为数不多的对外暴露的接口。

而在官方的线程池中,也几乎没有什么能直接操作到线程或者任务的API,对外暴露的多为管理线程池方面的API。

这个方法需要好好掌握,也是面试中问的最多的部分。

/**
 * 对外暴露的方法, 执行任务
 * @param r
 */
public void execute(Runnable r) {
    if (r == null) throw new NullPointerException();

    int n = threadNum.get();

    // 小于核心线程数
    if (n < corePoolSize) {
        addWorker(r, true);
        return;
    }
    // 队列未满, 未满时offer返回true
    if (taskQueue.offer(r)) {
        addWorker(null, false);
    }
    // 队列已满, 增加线程直到最大线程数为止
    else if (!addWorker(r, false)){
        // 执行拒绝策略
        reject(r);
    }

}

将任务交给worker内部类

execute中基本上只是一个中转,决定任务到底是由核心线程接收,还是队列接收,还是非核心线程接收或是拒绝策略接收。

在这里是要把任务添加给work,加入到works中,并且调用了线程的t.start()方法启动一个新的线程。

/**
 * 将任务添加到执行过程中
 * @param r
 * @param core
 */
private boolean addWorker(Runnable r, boolean core) {
    int num = threadNum.get();

    // 乐观锁
    for(;;) {
        // 线程数是否超出
        if (num >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }

        if (threadNum.compareAndSet(num, num + 1)) {
            // 添加成功
            break;
        }
    }

    Worker w = null;
    boolean workerStarted = false;
    try {
        w = new Worker(r);
        Thread t = w.thread;

        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                workers.add(w);
            } finally {
                mainLock.unlock();
            }
            t.start();
            workerStarted = true;
        }
    } finally {

    }

    return workerStarted;
}

真正执行任务

/**
 * 线程由addWorker中t.start已启动, 启动之后将执行Worker中的run方法
 * 这个run方法传入的是worker对象,
 * 对普通的任务做增强, 也用于对于所执行的任务做前置后置的管理
 * @param w
 */
private void runWorker(Worker w) {
    Runnable task = w.task;

    try {
        // 一直获取任务
        while (task != null || (task = getTask()) != null) {

            try {
                beforeExecute();

                try {
                    task.run();

                } finally {
                    afterExecute();
                }
            } finally {
                // 演示用, 直接加锁,  统计任务完成数
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    compliteNum += 1;
                } finally {
                    mainLock.unlock();
                    task = null;
                }

            }
        }
    } finally {
        // 线程退出的后置操作
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            workers.remove(w);
            threadNum.decrementAndGet();
        } finally {
            mainLock.unlock();
        }

        if (threadNum.get() < corePoolSize) {
            addWorker(null, false);
        }
    }

}

获取任务,不让线程闲下来

/**
 * 从队列中获取任务
 * @return
 */
private Runnable getTask() {
    try {
        Runnable r = taskQueue.take();
        if (r != null) {
            return r;
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return null;
}

前后置增强

private void afterExecute() {
    //System.out.println("操作后置增强");
}

private void beforeExecute() {
    //System.out.println("操作前置增强");
}

获取总共完成的任务数

/**
 * 获取总共完成的任务数
 * @return
 */
public int getCompliteNum() {
    return this.compliteNum;
}

运行示例

image

全部代码

public class MyPool {

    /**
     * 线程池中用了一个ctl来实现, 不过自己实现的时候并不想写的这么复杂, 因此还是拆成两个变量, state与threadNum
     * 线程池状态,简单理解为-1是正常,大于0线程池已经异常, 简单起见,就没有做判断.
     */
    private AtomicInteger state = new AtomicInteger(0);
    /**
     * 线程数量
     */
    private AtomicInteger threadNum = new AtomicInteger(0);
    /**
     * 完成任务量
     */
    private volatile int compliteNum = 0;

    /**
     * 持有所有的线程
     */
    private HashSet<Worker> workers = new HashSet<>();

    /**
     * 缓存工作任务
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 线程池创建的工厂
     */
    private volatile ThreadFactory threadFactory;

    /**
     * 核心线程数
     */
    private volatile int corePoolSize;

    /**
     * 最大线程数
     */
    private volatile int maximumPoolSize;

    /**
     * 拒绝策略
     */
    private MyRejectedExecutionHandler handler;

    /**
     * 锁, 处理一些公用变量时需要加锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 提供一种构造方法, 只需要配置线程数和queue的容量
     * @param corePoolSize
     * @param maximumPoolSize
     * @param taskQueue
     */
    public MyPool(Integer corePoolSize, Integer maximumPoolSize, BlockingQueue taskQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.taskQueue = taskQueue;
        this.threadFactory = Executors.defaultThreadFactory();
        this.handler = new MyRejectedExecutionHandler();
    }

    /**
     * 内部类, worker,持有线程,执行任务
     */
    private class Worker implements Runnable {
        Thread thread;
        Runnable task;

        /**
         * 构造方法, 构造出work
         * @param task
         */
        public Worker( Runnable task) {
            this.task = task;
            this.thread = getThreadFactory().newThread(this);
        }

        @Override
        public void run() {
            System.out.println("覆盖run方法,对传入的任务做增强");
            runWorker(this);
        }
    }

    /**
     * 通过线程工厂创建线程
     * @return
     */
    private ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    /**
     * 自定义拒绝策略, 并没有去继承jdk中的接口
     * 直接打印, 不做其他处理
     */
    private class MyRejectedExecutionHandler {

        public void rejectedExecution(Runnable r, MyPool p) {
            System.out.println("任务过载,丢弃任务");
        }
    }

    /**
     * 对外暴露的方法, 执行任务
     * @param r
     */
    public void execute(Runnable r) {
        if (r == null) throw new NullPointerException();

        int n = threadNum.get();

        // 小于核心线程数
        if (n < corePoolSize) {
            addWorker(r, true);
            return;
        }
        // 队列未满, 未满时offer返回true
        if (taskQueue.offer(r)) {
            addWorker(null, false);
        }
        // 队列已满, 增加线程直到最大线程数为止
        else if (!addWorker(r, false)){
            // 执行拒绝策略
            reject(r);
        }

    }

    /**
     * 将任务添加到执行过程中
     * @param r
     * @param core
     */
    private boolean addWorker(Runnable r, boolean core) {
        int num = threadNum.get();

        // 乐观锁
        for(;;) {
            // 线程数是否超出
            if (num >= (core ? corePoolSize : maximumPoolSize)) {
                return false;
            }

            if (threadNum.compareAndSet(num, num + 1)) {
                // 添加成功
                break;
            }
        }

        Worker w = null;
        boolean workerStarted = false;
        try {
            w = new Worker(r);
            Thread t = w.thread;

            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    workers.add(w);
                } finally {
                    mainLock.unlock();
                }
                t.start();
                workerStarted = true;
            }
        } finally {

        }

        return workerStarted;
    }

    /**
     * 线程由addWorker中t.start已启动, 启动之后将执行Worker中的run方法
     * 这个run方法传入的是worker对象,
     * 对普通的任务做增强, 也用于对于所执行的任务做前置后置的管理
     * @param w
     */
    private void runWorker(Worker w) {
        Runnable task = w.task;

        try {
            // 一直获取任务
            while (task != null || (task = getTask()) != null) {

                try {
                    beforeExecute();

                    try {
                        task.run();

                    } finally {
                        afterExecute();
                    }
                } finally {
                    // 演示用, 直接加锁,  统计任务完成数
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        compliteNum += 1;
                    } finally {
                        mainLock.unlock();
                        task = null;
                    }

                }
            }
        } finally {
            // 线程退出的后置操作
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                workers.remove(w);
                threadNum.decrementAndGet();
            } finally {
                mainLock.unlock();
            }

            if (threadNum.get() < corePoolSize) {
                addWorker(null, false);
            }
        }

    }

    /**
     * 从队列中获取任务
     * @return
     */
    private Runnable getTask() {
        try {
            Runnable r = taskQueue.take();
            if (r != null) {
                return r;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void afterExecute() {
        //System.out.println("操作后置增强");
    }

    private void beforeExecute() {
        //System.out.println("操作前置增强");
    }

    private void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

    /**
     * 获取总共完成的任务数
     * @return
     */
    public int getCompliteNum() {
        return this.compliteNum;
    }

}

后记

工作中,手写线程池属实没什么用。jdk已经在底层实现了线程池的功能,我们只需要在此基础上封装即可。手写的意义在于能更细致的去精读里面的代码。获取你直接阅读,甚至是debug之后,你以为你已经掌握了。但是你用自己的语言,自己的理解再去实现一遍时。你会发现,还是有很多地方跟你想象中的不一样,有很多细节遗漏。因此即便到最后实现的线程池在一般流程下能运行,但是各种异常情况,各种容错都没有实现,这些异常的处理,已经边界条件的选择仍然需要精进。

上一篇下一篇

猜你喜欢

热点阅读