Java并发编程之线程池

2022-11-22  本文已影响0人  宏势

使用线程池三个好处:1.降低资源消耗2.提高响应速度3.提高线程的可管理性

一、 线程池原理

线程池其实是使用Executor框架实现,涉及架构类图如下:

Executor.png

线程池调度过程如图所示:

thread.png
1.提交任务时,当前工作线程数小于核心线程数,执行器会创建一个任务线程,并执行当前任务
2.当工作线程数大于等于核心线程数,新任务加入阻塞队列,一直到阻塞队列满
3.当阻塞队列满了且核心线程数也用完了,会为新任务创建一个线程(非核心线程)
4.当线程总数超出maximumPoolSize且阻塞队列也满了,会为新任务执行拒绝策略

二、 线程池创建

Executors创建线程池的五种方式

ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(2);
SimpleDateFormat df = new SimpleDateFormat("yyyy:MM:dd HH:mm:ss");
scheduledExecutor.schedule(() -> System.out.println(df.format(new Date()) + ":一次性任务"), 4, TimeUnit.SECONDS);
scheduledExecutor.scheduleWithFixedDelay(() -> System.out.println(df.format(new Date())+":withFixedDalay"), 5, 3,TimeUnit.SECONDS);// 以上一次任务结束时间+延迟时间=下一次任务开始时间
scheduledExecutor.scheduleAtFixedRate(() -> System.out.println(df.format(new Date())+":atFixedDalay"), 5, 2,TimeUnit.SECONDS);//以上一次任务开始时间+延迟时间=下一次任务开始时间

newFixedThreadPoolnewSingleThreadExecutor: 阻塞队列无界,会堆积大量任务导致OOM(内存耗尽)
newCachedThreadPoolnewScheduledThreadPool: 线程数量无上限,会导致创建大量线程,从而导致OOM

建议直接使用线程池ThreadPoolExecutor的构造器

public ThreadPoolExecutor(int corePoolSize,  //核心线程数
                              int maximumPoolSize,   //最大线程数
                              long keepAliveTime,    //最大空闲时间 
                              TimeUnit unit,  //单位
                              BlockingQueue<Runnable> workQueue, //阻塞队列
                              ThreadFactory threadFactory,   //线程创建工厂
                              RejectedExecutionHandler handler)  //任务拒绝策略处理器

ThreadFactory(线程工厂)

可以通过自定义线程工厂更改所创建的新线程的名称、线程组、优先级、守护进程状态等,未指定工厂类默认使用内置工厂类DefaultThreadFactory

线程池拒绝策略

RejectedExecutionHandler 四个子类对应四种策略:

三、 线程池状态

threadpoolstate.png

四、 线程池任务

支持两种提交任务的方式:execute方法submit方法

void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);  //支持异步获取返回
<T> Future<T> submit(Runnable task, T result); //支持异步获取返回值 result(任务会对result值进行修改)
Future<?> submit(Runnable task); //支持异步获取返回值null

利用线程生成一个随机值:

ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> future = executorService.submit(() -> new Random().nextInt(1000));
System.out.println(future.get());

submit 方法本质上是创建了一个RunnableFuture, 源码如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

Runnable、Callable 、Future、 FutureTask 区别

五、ForkJoin

利用分治思想,充分利用CPU资源,提升大任务的运算速度,适合那种CPU密集型的任务。比如JDK1.8的Stream基于ForkJoin实现。

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool,ForkJoinWorkerThread负责执行这些任务。

public class ForkJoinPoolTest {
    private static long[] array = {2, 5, 9, 10, 11, 55, 11,90};
    private static ForkJoinPool forkJoinPool = new ForkJoinPool();
    public static class SumTask extends RecursiveTask<Long> {

        private int startIndex;
        private int endIndex;
        public SumTask(int startIndex, int endIndex){
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }
        @Override
        protected Long compute() {
            long sum = 0;
            if (endIndex - endIndex < 2) {
                for (int i = startIndex; i <= endIndex; i++) {
                    sum += array[i];
                }
            }else{
                int middle  = (startIndex + endIndex)/2;
                SumTask left = new SumTask(startIndex, middle);
                SumTask right = new SumTask(middle + 1, endIndex);
                left.fork();
                right.fork();
                sum = left.join() + right.join();
            }
            return sum;
        }
    }
    public static void main(String[] args) {
        System.out.println(forkJoinPool.invoke(new SumTask(0, 7)));
    }
}

六、自定义线程池(监控)

public class CustomThreadPoolTest {

    public static class CustomThreadPool extends ThreadPoolExecutor {

        private ThreadLocal<Long> time = new ThreadLocal<>();
        public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            time.set(System.currentTimeMillis());
        }
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println(String.format("%s : execute %s %s MS", Thread.currentThread().getName(), r.toString(), System.currentTimeMillis() - time.get()));//计算任务执行时长
        }
    }
    public static void main(String[] args) {
        CustomThreadPool customThreadPool = new CustomThreadPool(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque());
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            System.out.println(String.format("%s : taskCount=%s, completeTaskCount=%s, largestPoolSize=%s, getPoolSize=%s, getActiveCount=%s", Thread.currentThread().getName(),
                    customThreadPool.getTaskCount(), customThreadPool.getCompletedTaskCount(), customThreadPool.getLargestPoolSize(), customThreadPool.getPoolSize(), customThreadPool.getActiveCount()));
        }, 0, 1, TimeUnit.SECONDS);
        for (int i = 1; i <= 5; i++) {
            final int tmp = i;
            customThreadPool.submit(()->{
                try {
                    Thread.sleep(tmp * 1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读