线程池第一印象~

2019-05-28  本文已影响0人  SonyaBaby
进阶线程池啦~.png ThreadPoolExecutor 继承关系.png ThreadPoolExecutor 方法Structure.png AbstractExecutorService 方法Structure.png ExecutorService 方法Structure.png 最顶层接口 Executor.png

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
...
}
ThreadPoolExecutor 四个构造器.png

看源码可知前三个构造器最终调用的都是第四个进行初始化工作。

workQueue 等待队列

handler 拒绝处理任务时使用的策略

核心方法
execute() 是在Executor接口中的声明,通过这个方法可以向线程池提交一个任务,交由线程池去执行
submit() 是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown() 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow() 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池状态

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    /**
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *   
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
    **/

任务的执行相关重要参数

 // 任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();

// Accessed only under mainLock.
// 用来存放工作集   
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用来记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
// 用来记录已经执行完毕的任务个数
private long completedTaskCount;

// 线程工厂类,用来创建线程
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
// 线程存活时间
private volatile long keepAliveTime;
// 是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int corePoolSize;

// //线程池最大能容忍的线程数. Note that the actual maximum is internally bounded by CAPACITY.
private volatile int maximumPoolSize;

// 默认的任务拒绝策略:丢弃任务并抛出RejectedExecutionException异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

corePoolSize是正常情况下线程池大小,maximumPoolSize是线程池的一种额外Support,即任务量突然过大时的额外可支持的开销
largestPoolSize只是用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
   
    /** 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first  task.  
      * The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add  threads when it shouldn't, by returning false.
      */
    // 线程池中正在作业的线程数 < corePoolSize数
    if (workerCountOf(c) < corePoolSize) {
        // 是否可以继续向 corePool 中新增线程
        if (addWorker(command, true))
            return;
        // 再次获取当前线程池状态值
        c = ctl.get();
    }

    /** 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method.
      * So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
      */
    // 线程池为可运行状态,同时Runnable command可以加入等待队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //  非Running状态,则从队列中去掉command并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

       /** 3. If we cannot queue task, then we try to add a new thread.  
         * If it fails, we know we are shut down or saturated and so reject the task.
         */
         // 新增thread
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 不可以在拓展的线程池中运行该实例,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

使用 ThreadPoolExecutor 创建线程池:

public class HelloThreadPool {
  public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2,
      100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
    
    IntStream.range(0, 4).mapToObj(PrintTask::new).forEach(printTask -> {
      executor.execute(printTask);
      System.out.println("线程池中所有线程数目:" + executor.getPoolSize() + ",队列中待执行的任务数目:" +
        executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
    });
    
    executor.shutdown();
  }
}

class PrintTask implements Runnable {
  private int taskIndex;
  
  PrintTask(int index) {
    this.taskIndex = index;
  }
  
  @Override
  public void run() {
    System.out.println(taskIndex + " is running...");
     try {
     TimeUnit.SECONDS.sleep(2);
     } catch (InterruptedException e) {
     e.printStackTrace();
     }
    System.out.println(taskIndex + " end");
  }
}

output:

0 is running...
线程池中所有线程数目:1,队列中待执行的任务数目:0,已执行完的任务数目:0
线程池中所有线程数目:1,队列中待执行的任务数目:1,已执行完的任务数目:0
线程池中所有线程数目:1,队列中待执行的任务数目:2,已执行完的任务数目:0
线程池中所有线程数目:2,队列中待执行的任务数目:2,已执行完的任务数目:0
3 is running...
3 end
0 end
1 is running...
2 is running...
2 end
1 end

当把要执行的实例变成 5 个就会出现 RejectedExecutionException 异常:

java.util.concurrent.RejectedExecutionException: 
Task threadPool.PrintTask@7699a589 rejected from 
java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]

java doc中,并不提倡我们直接使用 ThreadPoolExecutor ,而是使用 Executors 类中提供的几个静态方法来创建线程池

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

参考链接:https://www.cnblogs.com/dolphin0520/p/3932921.html

上一篇 下一篇

猜你喜欢

热点阅读