Java并发之线程池简介

2021-03-17  本文已影响0人  赵镇

为什么要有线程池

具体核心参数

线程工具类Executors可以创建的几种具体类型得线程池

首先要说明的是几个类之间的关系 Executor是 线程池ThreadExecutorPool执行的最终接口,具体来说是ThreadExecutorPool集成抽象类AbstractExecutorService,抽象类AbstractExecutorService实现 ExecutorService接口, ExecutorService继承Executor接口。然后Executors是创建几种具体线程池的工具类

newFixedThreadPool

使用该静态方法,创建的是固定线程大小的线程池。ExecutorService executorService = Executors.newFixedThreadPool(10);
源码如下

                //Executors.java
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
                //ThreadPoolExecutor.java
                   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);

线程池的核心线程数和最大线程数都是初始化赋值时的线程数。keepAliveTime为0 表示非核心线程数执行完后立即销毁。但是在newFixedThreadPool 中核心线程数和最大线程数是相同的,即没有非核心线程。所以此处的keepAliveTime为多少都是没有作用的。
另外 此处使用的workQueue为LinkedBlockingQueue,LinkedBlockingQueue的队列最大数值为Integer.MAX_VALUE。因此随着任务队列的数据不断增多,很有可能会导致oom异常
使用如下测试代码进行测试

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i=0;i<Integer.MAX_VALUE;i++){
            executorService.execute(new Task());
        }

    }
}

class Task extends Thread{
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

vm options设置为-Xmx8m -Xms8m ;可以看到运行不就就会看到OOM异常


file

newCachedThreadPool

newCachedThreadPool从字面意思理解即为缓存的线程池,即是可以利用缓存的线程来执行后续的任务

        //Executors.java
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
        //

可以看到 线程池的几个参数中,核心线程数为0,但是最大线程数为Integer.MAX_VALUE,且任务队列为同步队列,只能容纳一个任务,这样就会导致当任务量太大时创建线程过多会直接OOM异常。但是当任务量小,任务执行不存在阻塞的情况时就可以复用之前缓存的线程.另外使用该线程执行完任务之后,由于核心线程数为0,但是keepAliveTime设置为60s.所以在无任务执行60s之后线程池将会退出。

复用情况

测试代码如下

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i=0;i<1000;i++){
            cachedThreadPool.execute(new Task());
        }

    }
}
class Task extends Thread{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

执行结果


file

可以看到在执行过程中线程存在复用的情况

OOM异常情况

将vm options 设置为-Xmx8m -Xms8m

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i=0;i<Integer.MAX_VALUE;i++){
            cachedThreadPool.execute(new Task());
        }

    }
}
class Task extends Thread{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
        try {
            Thread.sleep(5000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

结果如下图


file

在运行一段时间后出现OOM异常

newSingleThreadExecutor

newSingleThreadExecutor即为线程池中只创建了一个线程的线程池,创建源码如下

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

可以看到核心线程数和最大线程数均设置为1。任务队列使用的是无界的阻塞队列。这也就意味着当大量任务堆积在队列中时可能造成OOM异常
模拟OOM异常情况

public class SingleThreadPool {
    public static void main(String[] args) {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i=0;i<Integer.MAX_VALUE;i++){
            singleThreadExecutor.execute(new Task());
        }
    }
}
class Task extends Thread{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
        try {
            Thread.sleep(5000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果为


file

newScheduledThreadPool

newScheduledThreadPool即为计划的线程池,源码为

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

可以看到jdk单独定义了一个具体的ScheduledThreadPoolExecutor类来完成这种定时任务的操作。可以看到最大线程数为Integer.MAX_VALUE,这样也可能因为线程数过多造成内存溢出。但是因为keepAliveTime为0,所以当线程执行完后立即停止运行。另外此处用的是一个优先队列,同样也是一个无界的阻塞队列。即也有可能因为队列任务过多造成OOM异常,另外也有可能因为最大线程数为Integer.MAX_VALUE,创建了太多的线程造成OOM异常。具体的造成OOM异常的代码可以参考上述几种线程池的代码进行尝试。有如下两种方法

newWorkStealingPool

newWorkStealingPool 是jdk1.8新增的一种线程池,newWorkStealingPool的源码为

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

底层并没有使用原生的ThreadPoolExecutor而是用的ForkJoinPool 来实现的。 ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。

public class WorkStealingPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService  workStealingPool = Executors.newWorkStealingPool(2);
        for (int i = 1; i < 10; i++) {
            int finalI = i;
            workStealingPool.execute(() -> {
                try {
                    System.out.println(finalI);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(10000);//这里让主线程等待子线程执行完毕,也可以使用计数器的方式
        workStealingPool.shutdown();
    }
}

总结

上文主要介绍了几种线程池的用法,后续文章会继续探讨自定义线程池以及线程池的使用

欢迎关注和点赞,以及总结的分类面试题https://github.com/zhendiao/JavaInterview

上一篇 下一篇

猜你喜欢

热点阅读