JavaJavaalready

万字长文:带你透彻理解“线程池”

2022-03-09  本文已影响0人  马小莫QAQ

目标

什么是线程池

线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过的线程,这里的任务就是我们前面学过的实现了Runnable或Callable接口的实例对象;

为什么使用线程池

使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力;当然了,使用线程池的原因不仅仅只有这些,我们可以从线程池自身的优点上来进一步了解线程池的好处;

使用线程池有哪些优势

  1. 线程和任务分离,提升线程重用性;
  2. 控制线程并发数量,降低服务器压力,统一管理所有线程;
  3. 提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间;

线程池应用场景介绍

  1. 网购商品秒杀
  2. 云盘文件上传和下载
  3. 12306网上购票系统等

总之,只要有并发的地方、任务数量大或小、每个任务执行时间长或短的都可以使用线程池;

只不过在使用线程池的时候,注意一下设置合理的线程池大小即可;(关于如何合理设置线程池大小在后面的章节中讲解)

Java内置线程池原理剖析

(源码演示在idea中查看)

ThreadPoolExecutor部分源码

构造方法:

public ThreadPoolExecutor(
        int corePoolSize, //核心线程数量
        int maximumPoolSize,//     最大线程数
        long keepAliveTime, //       最大空闲时间
        TimeUnit unit,         //        时间单位
        BlockingQueue<Runnable> workQueue,   //   任务队列
        ThreadFactory threadFactory,    // 线程工厂
        RejectedExecutionHandler handler  //  饱和处理机制
 ) 
{ ... }

ThreadPoolExecutor参数详解

我们可以通过下面的场景理解ThreadPoolExecutor中的各个参数:

线程池工作流程总结示意图

自定义线程池-参数设计分析

通过观察Java中的内置线程池参数讲解和线程池工作流程总结,我们不难发现,要设计一个好的线程池,就必须合理的设置线程池的4个参数;那到底该如何合理的设计4个参数的值呢?我们一起往下看.

4个参数的设计:

1.核心线程数(corePoolSize)

核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统百分之80的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10;

当然实际情况不可能这么平均,所以我们一般按照8020原则设计即可,既按照百分之80的情况设计核心线程数,剩下的百分之20可以利用最大线程数处理;

2.任务队列长度(workQueue)

任务队列长度一般设计为:核心线程数/单个任务执行时间*2即可;例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200;

3.最大线程数(maximumPoolSize)

最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定:例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)单个任务执行时间;既: 最大线程数=(1000-200)0.1=80个;

4.最大空闲时间(keepAliveTime)

这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;

自定义线程池-实现步骤

  1. 编写任务类(MyTask),实现Runnable接口;
  2. 编写线程类(MyWorker),用于执行任务,需要持有所有任务;
  3. 编写线程池类(MyThreadPool),包含提交任务,执行任务的能力;
  4. 编写测试类(MyTest),创建线程池对象,提交多个任务测试;

MyTask

package com.itheima.demo01;
/*
    需求:
        自定义线程池练习,这是任务类,需要实现Runnable;
        包含任务编号,每一个任务执行时间设计为0.2秒
 */
public class MyTask implements Runnable{
    private int id;
    //由于run方法是重写接口中的方法,因此id这个属性初始化可以利用构造方法完成

    public MyTask(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println("线程:"+name+" 即将执行任务:"+id);
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程:"+name+" 完成了任务:"+id);
    }

    @Override
    public String toString() {
        return "MyTask{" +
                "id=" + id +
                '}';
    }
}

MyWorker

package com.itheima.demo01;

import java.util.List;

/*
    需求:
        编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字;
        设计一个集合,用于保存所有的任务;
 */
public class MyWorker extends Thread{
    private String name;//保存线程的名字
    private List<Runnable> tasks;
    //利用构造方法,给成员变量赋值

    public MyWorker(String name, List<Runnable> tasks) {
        super(name);
        this.tasks = tasks;
    }

    @Override
    public void run() {
       //判断集合中是否有任务,只要有,就一直执行任务
        while (tasks.size()>0){
            Runnable r = tasks.remove(0);
            r.run();
        }
    }
}

MyThreadPool

package com.itheima.demo01;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/*
    这是自定义的线程池类;

    成员变量:
        1:任务队列   集合  需要控制线程安全问题
        2:当前线程数量
        3:核心线程数量
        4:最大线程数量
        5:任务队列的长度
    成员方法
        1:提交任务;
            将任务添加到集合中,需要判断是否超出了任务总长度
        2:执行任务;
            判断当前线程的数量,决定创建核心线程还是非核心线程
 */
public class MyThreadPool {
    // 1:任务队列   集合  需要控制线程安全问题
    private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>());
    //2:当前线程数量
    private int num;
    //3:核心线程数量
    private int corePoolSize;
    //4:最大线程数量
    private int maxSize;
    //5:任务队列的长度
    private int workSize;

    public MyThreadPool(int corePoolSize, int maxSize, int workSize) {
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.workSize = workSize;
    }

    //1:提交任务;
    public void submit(Runnable r){
        //判断当前集合中任务的数量,是否超出了最大任务数量
        if(tasks.size()>=workSize){
            System.out.println("任务:"+r+"被丢弃了...");
        }else {
            tasks.add(r);
            //执行任务
            execTask(r);
        }
    }
    //2:执行任务;
    private void execTask(Runnable r) {
        //判断当前线程池中的线程总数量,是否超出了核心数,
        if(num < corePoolSize){
            new MyWorker("核心线程:"+num,tasks).start();
            num++;
        }else if(num < maxSize){
            new MyWorker("非核心线程:"+num,tasks).start();
            num++;
        }else {
            System.out.println("任务:"+r+" 被缓存了...");
        }
    }

}

MyTest

package com.itheima.demo01;
/*
    测试类:
        1: 创建线程池类对象;
        2: 提交多个任务
 */
public class MyTest {
    public static void main(String[] args) {
        //1:创建线程池类对象;
        MyThreadPool pool = new MyThreadPool(2,4,20);
        //2: 提交多个任务
        for (int i = 0; i <30 ; i++) {
            //3:创建任务对象,并提交给线程池
            MyTask my = new MyTask(i);
            pool.submit(my);
        }
    }
}

Java内置线程池-ExecutorService介绍

ExecutorService接口是java内置的线程池接口,通过学习接口中的方法,可以快速的掌握java内置线程池的基本使用 常用方法:

Java内置线程池-ExecutorService获取

获取ExecutorService可以利用JDK中的Executors 类中的静态方法,常用获取方式如下:

newCachedThreadPool

package com.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/*
    练习Executors获取ExecutorService,然后调用方法,提交任务;
 */
public class MyTest01 {
    public static void main(String[] args) {
//        test1();
        test2();
    }
    //练习newCachedThreadPool方法
    private static void test1() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newCachedThreadPool();
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable(i));
        }
    }
    private static void test2() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() {
            int n=1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"自定义的线程名称"+n++);
            }
        });
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable(i));
        }
    }
}
/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable implements Runnable{
    private  int id;
    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    }
}

newFixedThreadPool

package com.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/*
    练习Executors获取ExecutorService,然后调用方法,提交任务;
 */
public class MyTest02 {
    public static void main(String[] args) {
        //test1();
        test2();
    }
    //练习方法newFixedThreadPool
    private static void test1() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newFixedThreadPool(3);
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable2(i));
        }
    }
    private static void test2() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newFixedThreadPool(3,new ThreadFactory() {
            int n=1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"自定义的线程名称"+n++);
            }
        });
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable2(i));
        }
    }
}

/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable2 implements Runnable{
    private  int id;
    public MyRunnable2(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    }
}

newSingleThreadExecutor

package com.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/*
    练习Executors获取ExecutorService,然后调用方法,提交任务;
 */
public class MyTest03 {
    public static void main(String[] args) {
        //test1();
        test2();
    }
    //练习方法newFixedThreadPool
    private static void test1() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newSingleThreadExecutor();
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable3(i));
        }
    }
    private static void test2() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n=1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"自定义的线程名称"+n++);
            }
        });
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable3(i));
        }
    }
}

/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable3 implements Runnable{
    private  int id;
    public MyRunnable3(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    }
}

练习Executors获取ExecutorService,测试关闭线程池的方法;

package com.test;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/*
    练习Executors获取ExecutorService,测试关闭线程池的方法;
 */
public class MyTest04 {
    public static void main(String[] args) {
        test1();
//        test2();
    }
    //练习方法newFixedThreadPool
    private static void test1() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newSingleThreadExecutor();
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable4(i));
        }
        //3:关闭线程池,仅仅是不再接受新的任务,以前的任务还会继续执行
        es.shutdown();
        //es.submit(new MyRunnable4(888));//不能再提交新的任务了
    }
    private static void test2() {
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n=1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"自定义的线程名称"+n++);
            }
        });
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) {
            es.submit(new MyRunnable4(i));
        }
        //3:立刻关闭线程池,如果线程池中还有缓存的任务,没有执行,则取消执行,并返回这些任务
        List<Runnable> list = es.shutdownNow();
        System.out.println(list);
    }
}

/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable4 implements Runnable{
    private  int id;
    public MyRunnable4(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    }

    @Override
    public String toString() {
        return "MyRunnable4{" +
                "id=" + id +
                '}';
    }
}

Java内置线程池-ScheduledExecutorService

ScheduledExecutorService是ExecutorService的子接口,具备了延迟运行或定期执行任务的能力,

常用获取方式如下:

ScheduledExecutorService常用方法如下

newScheduledThreadPool的schedule

package com.test.demo3;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/*
    测试ScheduleExecutorService接口中延迟执行任务和重复执行任务的功能
 */
public class ScheduleExecutorServiceDemo01 {
    public static void main(String[] args) {
        //1:获取一个具备延迟执行任务的线程池对象
        ScheduledExecutorService es = Executors.newScheduledThreadPool(3);
        //2:创建多个任务对象,提交任务,每个任务延迟2秒执行
        for (int i=1;i<=10;i++){
            es.schedule(new MyRunnable(i),2, TimeUnit.SECONDS);
        }
        System.out.println("over");
    }
}
class MyRunnable implements Runnable{
    private int id;

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务:"+id);

    }
}

scheduleAtFixedRate方法

package com.test.demo3;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/*
    测试ScheduleExecutorService接口中延迟执行任务和重复执行任务的功能
 */
public class ScheduleExecutorServiceDemo02 {
    public static void main(String[] args) {
        //1:获取一个具备延迟执行任务的线程池对象
        ScheduledExecutorService es = Executors.newScheduledThreadPool(3, new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"自定义线程名:"+n++);
            }
        });
        //2:创建多个任务对象,提交任务,每个任务延迟2秒执行
         es.scheduleAtFixedRate(new MyRunnable2(1),1,2,TimeUnit.SECONDS);
        System.out.println("over");
    }
}

class MyRunnable2 implements Runnable{
    private int id;

    public MyRunnable2(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name+"执行了任务:"+id);
    }
}

Java内置线程池-异步计算结果(Future)

我们刚刚在学习java内置线程池使用时,没有考虑线程计算的结果,但开发中,我们有时需要利用线程进行一些计算,然后获取这些计算的结果,而java中的Future接口就是专门用于描述异步计算结果的,我们可以通过Future 对象获取线程计算的结果;

Future 的常用方法如下:

package com.itheima.demo04;

import java.util.concurrent.*;

/*
    练习异步计算结果
 */
public class FutureDemo {
    public static void main(String[] args) throws Exception {
        //1:获取线程池对象
        ExecutorService es = Executors.newCachedThreadPool();
        //2:创建Callable类型的任务对象
        Future<Integer> f = es.submit(new MyCall(1, 1));
        //3:判断任务是否已经完成
        //test1(f);
        boolean b = f.cancel(true);
        //System.out.println("取消任务执行的结果:"+b);
        //Integer v = f.get(1, TimeUnit.SECONDS);//由于等待时间过短,任务来不及执行完成,会报异常
        //System.out.println("任务执行的结果是:"+v);
    }
    //正常测试流程
    private static void test1(Future<Integer> f) throws InterruptedException, ExecutionException {
        boolean done = f.isDone();
        System.out.println("第一次判断任务是否完成:"+done);
        boolean cancelled = f.isCancelled();
        System.out.println("第一次判断任务是否取消:"+cancelled);
        Integer v = f.get();//一直等待任务的执行,直到完成为止
        System.out.println("任务执行的结果是:"+v);
        boolean done2 = f.isDone();
        System.out.println("第二次判断任务是否完成:"+done2);
        boolean cancelled2 = f.isCancelled();
        System.out.println("第二次判断任务是否取消:"+cancelled2);
    }
}
class MyCall implements Callable<Integer>{
    private int a;
    private int b;
    //通过构造方法传递两个参数

    public MyCall(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        String name = Thread.currentThread().getName();
        System.out.println(name+"准备开始计算...");
        Thread.sleep(2000);
        System.out.println(name+"计算完成...");
        return a+b;
    }
}

综合案例-秒杀商品

案例介绍:

假如某网上商城推出活动,新上架10部新手机免费送客户体验,要求所有参与活动的人员在规定的时间同时参与秒杀挣抢,假如有20人同时参与了该活动,请使用线程池模拟这个场景,保证前10人秒杀成功,后10人秒杀失败;

要求:

思路提示:

代码步骤:

package com.itheima.demo05;
/*
    任务类:
        包含了商品数量,客户名称,送手机的行为;
 */
public class MyTask implements Runnable {
    //设计一个变量,用于表示商品的数量
    private static int id = 10;
    //表示客户名称的变量
    private String userName;

    public MyTask(String userName) {
        this.userName = userName;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(userName+"正在使用"+name+"参与秒杀任务...");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (MyTask.class){
            if(id>0){
                System.out.println(userName+"使用"+name+"秒杀:"+id-- +"号商品成功啦!");
            }else {
                System.out.println(userName+"使用"+name+"秒杀失败啦!");
            }
        }
    }
}
package com.itheima.demo05;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/*
    主程序类,测试任务类
 */
public class MyTest {
    public static void main(String[] args) {
        //1:创建一个线程池对象
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(15));
        //2:循环创建任务对象
        for (int i = 1; i <=20 ; i++) {
            MyTask myTask = new MyTask("客户"+i);
            pool.submit(myTask);
        }
        //3:关闭线程池
        pool.shutdown();
    }
}

案例介绍:

设计一个程序,使用两个线程模拟在两个地点同时从一个账号中取钱,假如卡中一共有1000元,每个线程取800元,要求演示结果一个线程取款成功,剩余200元,另一个线程取款失败,余额不足;

要求:

思路提示:

package com.itheima.demo06;

public class MyTask implements Runnable {
    //用户姓名
    private String userName;
    //取款金额
    private double money;
    //总金额
    private static double total = 1000;

    public MyTask(String userName, double money) {
        this.userName = userName;
        this.money = money;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(userName+"正在准备使用"+name+"取款:"+money+"元");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (MyTask.class){
            if(total-money>0){
                System.out.println(userName+"使用"+name+"取款:"+money+"元成功,余额:"+(total-money));
                total-=money;
            }else {
                System.out.println(userName+"使用"+name+"取款:"+money+"元失败,余额:"+total);
            }
        }
    }
}
package com.itheima.demo06;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyTest {
    public static void main(String[] args) {
        //1:创建线程池对象
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
            int id = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ATM" + id++);
            }
        });
        //2:创建两个任务并提交
        for (int i = 1; i <=2 ; i++) {
            MyTask myTask = new MyTask("客户" + i, 800);
            pool.submit(myTask);
        }
        //3:关闭线程池
        pool.shutdown();
    }
}

线程池总结

线程池的使用步骤可以归纳总结为五步 :

  1. 利用Executors工厂类的静态方法,创建线程池对象;
  2. 编写Runnable或Callable实现类的实例对象;
  3. 利用ExecutorService的submit方法或ScheduledExecutorService的schedule方 法提交并执行线程任务
  4. 如果有执行结果,则处理异步执行结果(Future)
  5. 调用shutdown()方法,关闭线程池

作者:Java知音
原文链接:https://mp.weixin.qq.com/s/IACTJi-ZXFOdaaiLjEmVTQ

上一篇下一篇

猜你喜欢

热点阅读