Java多线程

Springboot | 线程池的学习,多线程池配置示例

2020-08-20  本文已影响0人  Ada54

一、线程和进程,线程的生命周期
二、单线程和多线程
三、线程池的概念
四、线程池的使用
五、多线程池配置示例

一、线程和进程,线程的生命周期

链接:https://www.jianshu.com/p/5ddcc068d177

二、单线程和多线程

单线程:只有一条线程在执行任务

多线程:多条线程同时在执行任务,比较常见的一种方式。

多线程的安全问题:
在多线程执行过程中,需要注意的是多线程的安全问题。因为多条线程同时执行任务,可能会出现同时访问同一个资源的情况,导致出错。所以需要进行同步互斥访问处理: 使用synchronized 同步代码块和 Lock 锁,只让一个线程访问资源,避免其他线程同时抢到CPU资源的执行权。

三、线程池的概念

线程池概念:简单的说,线程池是一个存放线程的容器。有任务时,从线程池里头取线程,不用的时候再放到池中给别的任务使用。

线程池使用目的
(1)避免反复创建和销毁线程带来的时间和内存消耗
(2)线程复用,提升响应速度,当有任务的时候,线程池直接调可用的线程进行执行,不需要等待线程的创建
(3)对线程进行统一的分配,提高线程的管理性

四、线程池的使用

4.1 线程池的创建

java中创建线程池的一个类:Executor,通常使用它的一个子类:ThreadPoolExecutor。

public ThreadPoolExecutor(
      int corePoolSize,  
      int maximumPoolSize,  
      long keepAliveTime,  
      TimeUnit unit,  
      BlockingQueue workQueue,  
      ThreadFactory threadFactory,  
      RejectedExecutionHandler handler)

线程池这几个参数的解释
(1)corePoolSize:线程池中的核心线程数量,即便是线程池里没有任何任务,也会有corePoolSize个线程在等任务
(2)maximumPoolSize:线程池中可以容纳的最大线程的数量(3)keepAliveTime:非核心线程可以保留的最长的空闲时间。当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则线程退出。
(4)unit:用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS
(5)workQueue:等待队列,任务可以储存在任务队列中等待被执行,采用FIFIO原则(先进先出)
(6)threadFactory:创建线程的线程工厂,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。
(7)handler:线程池对拒绝任务(无线程可用)的处理策略,当线程池里线程被耗尽,且队列也满了的时候会调用

线程池的拒绝策略包括以下4种:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出异常
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

4.2 线程池的执行流程

线程池的执行流程图.jpg

step1:任务到达时,先判断当前线程数量是否小于核心线程数corePoolSize,若小于则创建线程来执行任务,否则将任务放入workQueue等待队列
step2:若workQueue等待队列满了,则判断当前线程数量是否小于最大线程数maximumPoolSize,若小于则创建线程执行任务,否则就会调用handler,线程池采用拒绝策略来处理任务。

五、多线程池配置示例

5.1 添加依赖

该依赖的作用是用于配置类和实体类的字段定位

        <!--用于配置类和实体类的字段定位-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

5.2 定义基类,定义线程池基本参数

/**
 * 文件描述
 * 线程池基本参数
 * @author hjj
 * @date 2020年08月19日 15:43
 */
@Data
public class AsyncContants {
    /**
     * 核心线程数
     */
    private Integer corePoolSize=10;

    /**
     * 最大线程数
     */
    private Integer maxPoolSize=20;

    /**
     * 等待队列
     */
    private Integer queueCapacity=50;

    /**
     *线程池维护线程所允许的空闲时间,单位为秒
     */
    private Integer keepAliveSeconds=120;

}

5.3 配置文件中配置线程池基本参数值

#线程池配置
#第一个线程池
primary.async.corePoolSize=10
primary.async.maxPoolSize=20
primary.async.queueCapacity=50
primary.async.keepAliveSeconds=120
#第二个线程池
secondary.async.corePoolSize=20
secondary.async.maxPoolSize=40
secondary.async.queueCapacity=100
secondary.async.keepAliveSeconds=120

5.4 定义多个线程池配置类

读取配置文件,进行线程池配置
(1)添加@Component注解,声明该类为 Spring 组件,交由容器管理
(2)添加@ConfigurationProperties注解,声明该实体类对应的配置字段前缀
(3)继承AsyncContants基类

/**
 * 文件描述
 * 读取配置文件,进行线程池配置
 * @author hjj
 * @date 2020年08月19日 15:49
 */
@Component
@ConfigurationProperties(value = "primary.async")
public class PrimaryAsyncContants extends AsyncContants{
}
/**
 * 文件描述
 * 读取配置文件,进行线程池配置
 * @author hjj
 * @date 2020年08月19日 15:50
 */
@Component
@ConfigurationProperties(value = "secondary.async")
public class SecondaryAsyncConstants extends AsyncContants{
}

5.5线程池的自定义配置

将配置信息注入到线程池对象 ThreadPoolTaskExecutor 中,生成可用的 Executor 对象
(1)添加@Configuration注解
(2)添加@EnableAsync注解,开启异步任务
(3)在实例化方法上添加 @Bean 注解
getPrimaryAsyncTaskExecutor() 与 getSecondaryAsyncTaskExecutor() 方法分别实例化了对应的 Executor 对象,并通过注解 @Bean 将其纳入容器中
后续需要使用线程池,直接引用Bean名称

/**
 * 文件描述
 * 多个线程池自定义配置
 * @author hjj
 * @date 2020年08月19日 15:56
 */
@Configuration
@EnableAsync
public class AsyncTaskPoolConfig {

    private PrimaryAsyncContants primaryAsyncContants;
    private SecondaryAsyncConstants secondaryAsyncConstants;

    @Autowired(required = false)
    public AsyncTaskPoolConfig(PrimaryAsyncContants primaryAsyncContants, SecondaryAsyncConstants secondaryAsyncConstants){
        this.primaryAsyncContants = primaryAsyncContants;
        this.secondaryAsyncConstants = secondaryAsyncConstants;
    }

    @Bean(name = "primaryAsyncTaskExecutor")
    public Executor getPrimaryAsyncTaskExecutor(){
        return initExecutor(primaryAsyncContants,"primaryAsyncTaskExecutor-");
    }

    @Bean(name = "secondaryAsyncTaskExecutor")
    public Executor getSecondaryAsyncTaskExecutor(){
        return initExecutor(secondaryAsyncConstants,"secondaryAsyncTaskExecutor-");
    }

    private ThreadPoolTaskExecutor initExecutor(AsyncContants asyncContants,String prefix){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(asyncContants.getCorePoolSize());
        threadPoolTaskExecutor.setMaxPoolSize(asyncContants.getMaxPoolSize());
        threadPoolTaskExecutor.setQueueCapacity(asyncContants.getQueueCapacity());
        threadPoolTaskExecutor.setKeepAliveSeconds(asyncContants.getKeepAliveSeconds());
        threadPoolTaskExecutor.setThreadNamePrefix(prefix);
        // 线程池对拒绝任务(无线程可用)的处理策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return threadPoolTaskExecutor;
    }
}

5.6 编写异步任务的实现方法

(1)添加@Async注解,引用刚才定义的线程池Bean名称
(2)添加@Component注解,保证可以组件被扫描到
注:异步任务的实现方法要定义在一个类中,不能与调用它的方法写在同一个类中,不然不起效果

/**
 * 文件描述
 * 任务异步处理
 * @author hjj
 * @date 2020年07月22日 16:33
 */
@Component
@Slf4j
public class AsyncTask {

    @Async("primaryAsyncTaskExecutor")
    public CompletableFuture<String> getUserInfoByCompletableFuture(String userName) {
            String userInfo="";
            try{
                Thread.sleep(10);
                userInfo = userName+"的基本信息!";
                log.info("线程:"+Thread.currentThread().getName());
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
                e.printStackTrace();
            }
        return CompletableFuture.completedFuture(userInfo);
    }
}

5.7 调用异步任务

/**
 * 文件描述
 *
 * @author hjj
 * @date 2020年07月22日 16:48
 */
@Component
@Slf4j
public class CompletableFutureDemo {
    @Autowired
    private AsyncTask asyncTask;

    List<String> batchGetUserInfoByCompletableFuture(List<String> userNameList) throws InterruptedException, ExecutionException{
        List<CompletableFuture<String>>  userInfoFutrues = userNameList.stream().map(userName->asyncTask.getUserInfoByCompletableFuture(userName)).collect(Collectors.toList());
        return userInfoFutrues.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
}

5.8 测试

 @Test
    public void CompletableFutureTest()throws InterruptedException, ExecutionException {
        List<String> userNameList = new ArrayList<>();
        for(int i=0;i<5;i++){
            userNameList.add("Ada"+i);
        }
        long start =System.currentTimeMillis();
        List<String> userInfoResult = completableFutureDemo.batchGetUserInfoByCompletableFuture(userNameList);
        long end =System.currentTimeMillis();
        log.info("CompletableFuture结果"+userInfoResult+"\nCompletableFuture耗时--》"+ (end-start)+"ms");
    }

从测试结果可以看到,使用了第一个线程池


图片.png

参考:
https://www.cnblogs.com/monkay/p/11170421.html

上一篇下一篇

猜你喜欢

热点阅读