无聊到看了眼java线程池源码

2018-07-03  本文已影响24人  都有米

一、线程池框架

线程池本质上就是一个任务执行器。我们在使用线程池时使用的实现类就是ThreadPoolExecutorScheduledThreadPoolExecutor。他们之间的关系如下图所示,ScheduledThreadPoolExecutor是继承了ThreadPoolExecutor,并实现了任务调度接口。所以在使用线程池时推荐使用ScheduledThreadPoolExecutor来实现,因为它的功能更丰富。

线程池

1、Executor:任务执行器的顶层接口,就简单的一个execute方法,接收一个要执行的任务,没有强制要求异步执行,具体的逻辑由实现类去完成。

public interface Executor {
    void execute(Runnable command);
}

2、ExecutorService:执行器服务接口。它继承了接口Executor,在此基础上增加了两类功能接口,1、关闭任务执行器;2、提交任务的方法返回一个Future对象,用来跟踪任务执行的结果。ThreadPoolExecutor是该接口的一个实现类,可以通过实例化ThreadPoolExecutor来使用线程池。

public interface ExecutorService extends Executor {
    // 关闭执行者,关闭之前添加的任务还会执行,关闭后不再接受新增任务了。
    void shutdown();
    // 执行带返回值的任务
    <T> Future<T> submit(Callable<T> task);
    Future<?> submit(Runnable task);
}

3、Scheduled:调度接口,将任务列入计划(或时间)表。ScheduledThreadPoolExecutor是在ThreadPoolExecutor基础上实现了调度接口的线程池。任务调度的功能是高频需求,所有我们项目中封装线程池工具时推荐使用ScheduledThreadPoolExecutor来实现。

public interface ScheduledExecutorService extends ExecutorService {
    // 延时执行任务
    public ScheduledFuture<?> schedule(Runnable command, 
                                                 long delay, 
                                                 TimeUnit unit);
    // 周期性地执行任务
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
}

二、线程池实现

虽然我推荐大家使用ScheduledThreadPoolExecutor,但是从类图中可以看出它继承自ThreadPoolExecutor,关键的任务执行逻辑都在ThreadPoolExecutor中。所以要了解线程池执行任务的逻辑还得去看ThreadPoolExecutor类的实现。

线程池

首先我们看下ThreadPoolExecutor线程池的使用。大致如下所示,创建一个ThreadPoolExecutor实例,然后调用它的execute(runnable)方法来添加任务即可。

    // 定义相关常量
    final int CORE_POOL_SIZE = 2 * CPU_CORE_NUM;
    final int MAX_POOL_SIZE = 3 * CPU_CORE_NUM;
    final int KEEP_ALIVE_TIME = 1;

    // 创建任务队列
    final ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(50);

    // 创建线程创建工厂
    final AtomicInteger mCount = new AtomicInteger(1);
    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Pool-Thread-"+mCount.getAndIncrement());
        }
    };

    // 创建线程池,注意传递的这几个参数,很重要。
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, //核心线程数
            MAX_POOL_SIZE, //最大线程数
            KEEP_ALIVE_TIME,  //空闲线程存活时间
            TimeUnit.SECONDS,  // KEEP_ALIVE_TIME数字的时间单位
            arrayBlockingQueue, // 任务队列
            threadFactory, //线程创建工厂
            new ThreadPoolExecutor.DiscardPolicy() // 任务拒绝策略处理器
    );

    // 向线程池中添加任务
    threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            // do sth
            SDKLogger.d("do sth");
        }
    });

线程池构造参数

接下来重点看下线程池的execute方法。线程池执行任务的关键策略都在这里。线程池添加任务三部曲。

    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        /*
         * Proceed in 3 steps:
         *
         * 1. 如果当前运行的线程数量小于核心线程数量(corePoolSize), 
         *  就尝试创建一个新线程来运行新增任务。
         *
         * 2. 如果当前运行的线程数量大于等于核心线程数量(corePoolSize),
         *  就把新增任务添加到任务队列中去。
         *
         * 3. 如果任务队列满了,就创建新线程来执行新增任务。
         *  如果线程数量已经大于最大线程数(maximumPoolSize),就执行拒绝新增任务逻辑。
         */
        int c = ctl.get();
        // workerCountOf方法可以获取当前线程池中的线程数量
       // 第1步:threadSize < corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        // 第2步:  threadSize >= corePoolSize 把任务添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 任务添加到队列后,double check下当前线程池状态
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        // 第3步:  如果队列满了,就创建更多的线程
        //        如果threadSize >= maximumPoolSize  就拒绝接受任务
        } else if (!addWorker(command, false)) {
            reject(command);
        }
    }

三、线程池使用推荐

官方为了方便大家使用线程池提供了一个执行器工具类Executors,该类提供了一系列工厂方法用于创先线程池。但是我并不推荐大家使用这个工具类来创建线程池。我们来看下它的实现。

public class Executors {
    // 主要问题:没有设置任务队列容量,默认容量是 Integer.MAX_VALUE ,
    // 堆积的任务可能会耗费非常大的内存,甚至OOM。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    // 同上
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    // 主要问题:线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    // 同上
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    ...
}

所以线程池不建议使用Executors去创建,而是通过ThreadPoolExecutor或者ScheduledThreadPoolExecutor的方式创建,这样的处理方式可以让大家更加明确线程池的运行规则,规避资源耗尽的风险。 Executors各个方法的弊端:

  1. newFixedThreadPool和newSingleThreadExecutor:
      主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  2. newCachedThreadPool和newScheduledThreadPool:
      主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

下面AsyncTask中创建线程池的例子,创建线程池的方式和一些数据经验值,大家都可以借鉴下:

// 获取虚拟机可用的处理器数量
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// 核心线程数量至少2个,最多4个;
// 另外设置核心线程数比处理器数量少一个,是为了避免CPU在后台饱和工作。
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

// 设置任务队列容量128
private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);

public static final Executor THREAD_POOL_EXECUTOR;
static {
    // 使用ThreadPoolExecutor创建线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

四、分而治之:Fork/Join框架

补充完整图、使用、实现

上一篇下一篇

猜你喜欢

热点阅读