一些收藏

Java并发编程 线程池

2020-03-04  本文已影响0人  香沙小熊

1. 线程池的自我介绍

线程池的重要性
什么是池
复用我们的线程
控制我们资源的总量

如果不使用线程池,每个任务都新开一个线程处理
public class EveryTaskOneThread {
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Task());
            thread.start();
        }
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }
}

这样开销太大,我们希望有固定数量的线程,来执行者1000个线程,这样就避开了反复创建并销毁线程所带来的开销问题。

为什么要使用线程池?

1.反复创建开销大
2.过多的线程会占用太多内存
解决以上两个问题的思路

线程池的好处
线程池适合应用的场景

2. 创建和停止线程池

线程池构造函数的参数
public ThreadPoolExecutor(
                              int corePoolSize,        //第1个参数
                              int maximumPoolSize, //第2个参数
                              long keepAliveTime, //第3个参数
                              TimeUnit unit, //第4个参数
                              BlockingQueue<Runnable> workQueue, //第5个参数
                              ThreadFactory threadFactory, //第6个参数
                              RejectedExecutionHandler handler) { //第7个参数
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 || 
            maximumPoolSize < corePoolSize || 
            keepAliveTime < 0) // 第一处
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)//第二处
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
corePoolSize

表示常驻核心线程数。如果等于0,则任务执行完成后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。

maximumPoolSize

表示线程池能够容纳同时执行的最大线程数。从上方的示例代码中第一处来看,必须大于或等于1。如果待执行的线程数大于此值,需要借助第5个参数的帮助。缓存在队列中。如果maximumPoolSize 与corePoolSize 相等,即是固定大小线程池。

keepAliveTime

表示线程池中的线程空闲时间,当空闲时间达到KeepAliveTime 值时,线程被销毁,直到剩下corePoolSize 个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程大于corePoolSize 时,keepAliveTime 才会起作用。但是ThreadPoolExecutor的allowCoreThreadTimeOut 变量设置为ture时,核心线程超时后也会被回收。

TimeUnit

表示时间单位。keepAliveTime 的时间单位通常是TimeUnit.SECONDS。

workQueue

表示缓存队列。当请求的线程数大于maximumPoolSize时,线程进入BlockingQueue 阻塞队列。
有3种最常见的队列类型
1)直接交换:SynchronousQueue(实际存不下任务)
2)无界队列:LinkedBlockingQueue
3)有界队列:ArrayBlockingQueue

threadFactory

表示线程工厂。它用来生产一组相同任务的线程。线程池的命名是通过给这个factory增加组名前缀来实现的。在虚拟机栈分析时,就可以知道线程任务是由哪个线程工厂产生的。
新线程是有ThreadFactory创建的, 默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

handler

表示执行拒绝策略的对象。当超过第5个参数workQueue的任务缓存区上限的时候,就可以通过该策略处理请求,这是一种简单的限流保护。

增减线程的特点
  1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
  3. 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
  4. 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
线程池里的线程数量设定为多少比较合适

3. 常见线程池的特点和用法

public class FixedThreadPoolOOM {

    private static ExecutorService executorService = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new SubThread());
        }
    }

}

class SubThread implements Runnable {


    @Override
    public void run() {
        try {
            Thread.sleep(1000000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

VM 参数

-Xmx8m -Xms8m //将内存调小
image.png

4. 停止线程池的正确方法

1.shutdown
将线程池状态置为SHUTDOWN,并不会立即停止:

public class ShutDown {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);


        executorService.shutdown();
        executorService.execute(new ShutDownTask());
    }
}

class ShutDownTask implements Runnable {


    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "被中断了");
        }
    }
}
image.png
  1. shutdownNow()
    将线程池状态置为STOP。企图立即停止,事实上不一定:

它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。
但是大多数时候是能立即退出的

public class ShutDown {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdownNow();
//        List<Runnable> runnableList = executorService.shutdownNow();
        boolean b = executorService.awaitTermination(9L, TimeUnit.SECONDS);
        System.out.println(b);
        System.out.println(executorService.isTerminated());
    }
}

class ShutDownTask implements Runnable {


    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "被中断了");
        }
    }
}
image.png

3.isShutdown
线程池状态是否为SHUTDOWN
4.isTerminated
线程池程已停止
5.awaitTermination

5. 任务太多,怎么决绝?

拒绝时间

1.当Executor关闭时,提交新任务会被拒绝。
2.以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。

线程池拒绝策略,通常有以下四种:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务 ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

  1. 钩子方法,给线程池加点料
public class PauseableThreadPool extends ThreadPoolExecutor {

    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();
    private boolean isPaused;


    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
                handler);
    }


    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");

    }
}
...
我被执行
我被执行
我被执行
我被执行
线程池被暂停了
线程池被恢复了
我被执行
我被执行
...

6. 实现原理、源码分析

线程池组成部分
线程池实现任务复用的原理
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    

线程池线程复用的主要代码如上;

具体分析如下:

如下图,假设是定长线程池定长为3,

1.那么线程池会根据核心线程的长度来创建线程,这样就固定了线程数量,

2.然后在runWorker的时候,while循环不停的去缓存队列中查找task;

3.有task,则运行runlable的run方法;

4.如果没有task,则执行 processWorkerExit(w, completedAbruptly),删除当前worker(移除workers中该worker的对象(HashSet<Worker> workers = new HashSet<Worker>();))以及更新workcount等操作

6. 线程池状态

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
1
2、 SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

8.使用线程池的注意点

上一篇下一篇

猜你喜欢

热点阅读