Java并发 --- 线程池解析

2021-04-26  本文已影响0人  _code_x

线程池是什么,有什么好处?简述线程池中线程复用原理?

线程是一个重资源,JVM 中的线程与操作系统的线程是一对一的关系,所以在 JVM 中每创建一个线程就需要调用操作系统提供的 API 创建线程,赋予资源,并且销毁线程同样也需要系统调用。而系统调用就意味着上下文切换等开销,并且线程也是需要占用内存的,而内存也是珍贵的资源。因此线程的创建和销毁是一个重操作(系统调用),并且线程本身也占用资源。

总结补充:

线程池概述:

生产者 - 消费者模型

生产者-消费者设计模式

为什么要使用生产者消费者模式?

使用线程池的好处:

线程池线程复用原理:在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行。

创建线程池的方法有哪些?有哪些问题?自定义线程池注意点?

先回忆一下线程创建的方法:

但是我们工作中一般不这样来创建线程。原因:虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池线程复用刚好可以解决这一个问题。

ps:阿里开发手册:【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

从jdk1.5版本开始,在java.uitl.concurrent包下面定义定义了一些与并发相关的类,其中线程池最核心的一个类是ThreadPoolExecutor。

通过 Executors 的静态工厂方法创建线程池的四种方法:

阿里开发手册:【强制】线程池不允许使用Executors去创建,而是通过ThreadPooLExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则(避免上述四种方法的默认实现),规避资源耗尽的风险。说明:Executors返回的线程池对象的弊端如下:

最终都有可能导致OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列,并设置最大线程数!

如何合理自定义线程池ThreadPoolExecutor:上述四种线程池的最终方式也是调用的ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                          long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

创建线程池有哪些参数(构造函数中参数)?

public satatic class CallerRunsPolicy implements RejectExecutionHandler{
    public CallerRunsPolicy(){}
    
    public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
        if(!e.isShutdown()){
            r.run();
        }
    }
}
    public static class AbortPolicy implements RejectedExecutionHandler{
        public AbortPolicy(){}
        
        public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
public static class DiscardOldestPolicy implements RejectExecutionHandler{
    public DiscardOldestPolicy(){}
    
    public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
        if(!e.isShutdown()){
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

ps:还可以自定义实现(实现RejectExecutionHandler接口即可)和第三方实现的拒绝策略,比如Netty。

Netty中的实现很像JDK中的CallerRunsPolicy(调用者运行策略),舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而Netty是新建了一个线程来处理的。所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有自由就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    NewThreadRunsPolicy(){
        super();
    }
    
    public void rejectedExecution(Runnable r,ThreadPoolThread e){
        try{
            final Thread t = new Thread(r,"Temporary task executor");
            t.start();
        }catch(Throwable e){
            throw new RejectedExecutionException("Failed to start a new thread ",e);
        }
    }
}

线程池都有哪几种工作队列

线程池处理任务的流程?(或execute() 方法的执行机制?)

当提交一个新的任务到线程池时,线程池的处理流程如下:

ps:线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后还会循环获取工作队列中的任务来执行。

特别注意

向线程池提交任务的execute()和submit()方法的区别?

线程池中阻塞队列的作用?为什么是先添加队列而不是先创建最大线程?

阻塞队列作用

添加队列而不是先创建最大线程原因:

线程池有几种状态吗?

如何关闭线程池?

可以调用 shutdown 或 shutdownNow 方法关闭线程池,原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法中断线程,无法响应中断的任务可能永远无法终止。

通常调用 shutdown 来关闭线程池,如果任务不一定要执行完可调用 shutdownNow。

手写一个简易的线程池

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TreadPoolDemo {

    /** 利用阻塞队列实现生产者-消费者模式 */
    BlockingQueue<Runnable> workQueue;

    /** 保存内部工作线程 */
    List<WorkThread> workThreadList = new ArrayList<>();

    /** 构造函数传入核心线程数和阻塞队列 */
    TreadPoolDemo(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        for (int i = 0; i < poolSize; i++) {
            WorkThread workThread = new WorkThread();
            workThread.start();
            workThreadList.add(workThread);
        }
    }

    /** 放入任务,如果没有空间,则阻塞 */
    void execute(Runnable command) {
        try {
            workQueue.put(command);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /** 自定义工作线程 */
    class WorkThread extends Thread {

        @Override
        public void run() {
            // 循环取任务并执行
            while (true) {
                Runnable task = null;
                try {
                    // 获取阻塞队列的第一个任务,并删除
                    // 如果没有元素,则会阻塞等待
                    task = workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                task.run();
            }
        }
    }

    public static void main(String[] args) {

        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
        TreadPoolDemo pool = new TreadPoolDemo(2, workQueue);

        for (int i = 0; i < 10; i++) {
            int num = i;
            pool.execute(()->{
                System.out.println("线程 " + num + " 执行");
            });
        }
    }
}

执行结果:

线程 0 执行
线程 1 执行
线程 2 执行
线程 4 执行
线程 3 执行
线程 5 执行
线程 6 执行
线程 7 执行
线程 8 执行
线程 9 执行

巨人的肩膀:

https://blog.csdn.net/wolf909867753/article/details/77500625
https://blog.csdn.net/Kurry4ever_/article/details/109294661
https://blog.csdn.net/zj57356498318/article/details/102579980
https://mp.weixin.qq.com/s/mpKipT8tEmSCDUWi_lj-Bw
https://blog.csdn.net/qq_29373285/article/details/85238728
https://blog.csdn.net/zzti_erlie/article/details/91999145

上一篇下一篇

猜你喜欢

热点阅读