Java基础

并发编程(八)线程池详解

2021-01-13  本文已影响0人  Timmy_zzh
前言
既然线程的创建和销毁非常消耗性能,那有没有可能复用已经被创建好的线程呢?

1.线程池

1.1.线程池体系
1.线程池体系.png
源码
// 底层接口Executor,只有一个方法
public interface Executor {
    void execute(Runnable var1);
}

// 接口ExecutorService,抽象了线程池操作方法
public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
    <T> Future<T> submit(Callable<T> var1);
    <T> Future<T> submit(Runnable var1, T var2);
    Future<?> submit(Runnable var1);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

public abstract class AbstractExecutorService implements ExecutorService {
    public AbstractExecutorService() {
    }
}

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl;
    private static final int COUNT_BITS = 29;
    private static final int CAPACITY = 536870911;
    private static final int RUNNING = -536870912;
    private static final int SHUTDOWN = 0;
    private static final int STOP = 536870912;
    private static final int TIDYING = 1073741824;
    private static final int TERMINATED = 1610612736;
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock;
    private final HashSet<ThreadPoolExecutor.Worker> workers;
    private final Condition termination;
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();
    private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
    private final AccessControlContext acc;
    private static final boolean ONLY_ONE = true;
    ...
}

2.线程池创建

Executors
2.1.newSingleThreadExecutor
    //使用单线程线程池提交5次操作任务
    private static void singleThreadPool() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            int taskId = i;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread:" + 
                                       Thread.currentThread().getName() +
                            " ,running task:" + taskId);
                }
            });
        }
    }
日志打印:
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:1
Thread:pool-1-thread-1 ,running task:2
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-1 ,running task:4

源码:
    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue()));
    }
2.2.newCachedThreadPool
    private static void cacheThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            int taskId = i;
            try {
                //Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread:" + 
                                       Thread.currentThread().getName() +
                            " ,running task:" + taskId);
                    try {
                        // 模拟耗时任务
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
日志打印:
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:1
Thread:pool-1-thread-1 ,running task:2
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-1 ,running task:4
    
源码:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, 
                                      TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
2.3.newFixedThreadPool
//创建一个固定数目的,可重用的线程池
    private static void newFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            int taskId = i;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread:" + 
                                       Thread.currentThread().getName() +
                            " ,running task:" + taskId);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
日志打印:
> Task :testlib:_05ThreadPool.main()
Thread:pool-1-thread-2 ,running task:1
Thread:pool-1-thread-3 ,running task:2
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-2 ,running task:5
Thread:pool-1-thread-3 ,running task:4
Thread:pool-1-thread-1 ,running task:6
Thread:pool-1-thread-2 ,running task:7
Thread:pool-1-thread-3 ,running task:8
Thread:pool-1-thread-1 ,running task:9
    
源码:
    public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L,
                                      TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
2.4.newScheduledThreadPool
    private static void newScheduledThreadPool() {
        ScheduledExecutorService scheduledExecutorService = 
            Executors.newScheduledThreadPool(2);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Date now = new Date();
                System.out.println("Thread:" + Thread.currentThread().getName()
                                   + " ,timer:" + now.toString());
            }
        },500,500, TimeUnit.MILLISECONDS);

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        scheduledExecutorService.shutdown();
    }
日志打印:
> Task :testlib:_05ThreadPool.main()
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:44 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:44 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:45 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:45 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:46 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:46 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:47 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:47 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:48 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:48 CST 2021

3.线程池执行机制

3.1.线程池结构
2.线程池结构.png
public class ThreadPoolExecutor extends AbstractExecutorService {
    //根据AtomicInteger 的低29位,计算线程池线程的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程中包含的状态
    //默认状态,接收新任务并处理排队任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    //不接收新任务,但处理排队任务,调用shotdown 会处于该状态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //不接收新任务,也不处理排队任务,并中断正在执行的任务,调用showdownNow()
    private static final int STOP       =  1 << COUNT_BITS;
    //所有任务都已终止,workerCount为零时,线程会切换到TIDYING状态,并运行terminate方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    //terminate方法完成后线程池切换为该状态
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    // 线程池重要方法
    //1.计算当前线程池运行状态
    private static int runStateOf(int c) {
        return c & -CAPACITY;
    }

    //2.计算当前线程数量
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    //3.通过状态和线程数生成ctl
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    
}
3.2.线程池执行流程
3.线程池执行流程.png
  1. 当线程池中运行的线程数量还没有达到corePoolSize大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态
  2. 当线程池中运行的线程数量已经达到corePoolSize大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行
  3. 如果线程数大于corePoolSize数量,但是还没有达到最大线程数manimumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务
  4. 最后如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建”非核心线程“直接执行,线程池根据拒绝处理器定义的策略处理这个任务。
    • 如果没有在线程池中设置拒绝策略,线程池会抛出RejectedExecutionException 异常,即线程池拒绝接受这个任务。
上一篇 下一篇

猜你喜欢

热点阅读