(3)线程系列 - 一起解读ThreadUtils源码
开发第三方库的时候往往不会用到rxjava,会使用Thread来处理线程。那么就直接开门见山,让我们一步一步解读ThreadUtils源码。作者阅读源码习惯是先从public方法一个一个硬读下去,再配合使用方法来使用。
-
public static boolean isMainThread()
这个方法很简单,调用Android原生代码Looper来判断是否是主线程,关于Android代码Looper在第四章上会有详解 -
public static Handler getMainHandler()
代码是直接返回HANDLER,而这个HANDLER是一个全局静态主线程的变量 new Handler(Looper.getMainLooper()); -
public static void runOnUiThread(final Runnable runnable)
在ui主线程上执行事件,会先通过Looper.myLooper()判断当前线程是否是ui主线程,如果是直接运行事件,如果不是,则通过上面的HANDLER来运行事件 -
public static ExecutorService getFixedPool(@IntRange(from = 1) final int size)
该方法返回一个线程池,size参数则是线程池中线程数量,从注释和方法名中可以知道该方法FixedPool是个对线程数做限制的线程池,用于并发压力的场景下,那么我们沿着这个方法看看怎么实现创建线程池的 -
private static ExecutorService getPoolByTypeAndPriority(final int type)
getFixedPool
方法调用了该方法,该方法运行代码getPoolByTypeAndPriority(type, Thread.NORM_PRIORITY);Thread.NORM_PRIORITY是个优先级,优先级是5,而线程中是1-10,优先级数字越大则是更优先,为什么getFixedPool
方法的size参数到这个方法变成了type参数了呢?因为其他type参数都是负数的,而这个传递必须是1以上,所以这个设置xx数量的线程方法,已经自动归纳为另一个类型了。 -
private static ExecutorService getPoolByTypeAndPriority(final int type, final int priority)
该方法用到了全局静态Map<Integer, Map<Integer, ExecutorService>> TYPE_PRIORITY_POOLS,是个通过类型存储线程池的map,完整代码注释如下
/**
* 根据类型获取线程池
* @param type 类型
* @param priority 优先级
* @return 线程池
*/
private static ExecutorService getPoolByTypeAndPriority(final int type, final int priority) {
// 同步map安全,防止线程不安全创建多个 Map线程池
synchronized (TYPE_PRIORITY_POOLS) {
ExecutorService pool;
// 通过类型获取 Map线程池
Map<Integer, ExecutorService> priorityPools = TYPE_PRIORITY_POOLS.get(type);
if (priorityPools == null) {
// 如果没有 Map线程池 则新建一个
priorityPools = new ConcurrentHashMap<>();
pool = ThreadPoolExecutor4Util.createPool(type, priority);
// 加入线程池
priorityPools.put(priority, pool);
// 新建后加入 Map线程池
TYPE_PRIORITY_POOLS.put(type, priorityPools);
} else {
// 根据线程池优先级获取线程池
pool = priorityPools.get(priority);
// 如果没有该线程池,则创建新的线程池
if (pool == null) {
pool = ThreadPoolExecutor4Util.createPool(type, priority);
priorityPools.put(priority, pool);
}
}
return pool;
}
}
从上面代码可知Map TYPE_PRIORITY_POOLS里面包含了Map ExecutorService,
也就是说线程池包含进了两层map,最外的第一层是key为线程池类型的,里面的第二层是key为优先级的。
-
ThreadPoolExecutor4Util.createPool
在上面代码注释中可知这句核心关键代码,创建线程池,下面详细阶段这段代码 -
static final class ThreadPoolExecutor4Util extends ThreadPoolExecutor
该类继承了ThreadPoolExecutor来创建线程池,分别创建了以下几种线程,跟rxjava是不是有点像呢
/**
* 创建线程池
* @param type 类型
* @param priority 优先级
* @return 线程池
*/
private static ExecutorService createPool(final int type, final int priority) {
switch (type) {
case TYPE_SINGLE:
// 创建 核心线程数为1,线程池最大线程数量为1,非核心线程空闲存活时长为0
// 只创建一个线程确保 顺序执行的场景,并且只有一个线程在执行
return new ThreadPoolExecutor4Util(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue4Util(),
new UtilsThreadFactory("single", priority)
);
case TYPE_CACHED:
// 创建 核心线程数为0,线程池最大线程数量为128,非核心线程空闲存活时长为60秒
// 线程数为128个一般用于处理执行时间比较短的任务
return new ThreadPoolExecutor4Util(0, 128,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue4Util(true),
new UtilsThreadFactory("cached", priority)
);
case TYPE_IO:
// 创建 核心线程数为可计算资源*2+1,线程池最大线程数量为可计算资源*2+1,非核心线程空闲存活时长为30秒
return new ThreadPoolExecutor4Util(2 * CPU_COUNT + 1, 2 * CPU_COUNT + 1,
30, TimeUnit.SECONDS,
new LinkedBlockingQueue4Util(),
new UtilsThreadFactory("io", priority)
);
case TYPE_CPU:
// 创建 核心线程数为可计算资源+1,线程池最大线程数量为可计算资源*2+1,非核心线程空闲存活时长为30秒
return new ThreadPoolExecutor4Util(CPU_COUNT + 1, 2 * CPU_COUNT + 1,
30, TimeUnit.SECONDS,
new LinkedBlockingQueue4Util(true),
new UtilsThreadFactory("cpu", priority)
);
default:
// 创建 核心线程数、线程池最大数量为自定义的,空闲存活时长为0
return new ThreadPoolExecutor4Util(type, type,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue4Util(),
new UtilsThreadFactory("fixed(" + type + ")", priority)
);
}
}
除了创建,还重写了afterExecute方法和execute方法,不过这两个方法暂时没什么意义所以先不管。
在线程系列第二节讲到了创建线程池需要的ThreadFactory threadFactory(线程工厂)、BlockingQueue workQueue(任务队列),该ThreadUtils类自定义了线程工厂和任务队列,所以我们先了解该两类
-
private static final class LinkedBlockingQueue4Util extends LinkedBlockingQueue<Runnable>
我们先了解LinkedBlockingQueue类,LinkedBlockingQueue这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效,因为总线程数永远不会超过 核心线程数。
该类重写了offer方法,如果看过线程池的源码的同学,会发现如下图:
是的,执行pool.execute后会执行队列的offer方法,那为什么重写offer方法呢,通过上面的解释可以知道,LinkedBlockingQueue的总线程数永远不会超过核心线程数,通过重写它,可以自定义TYPE_CACHED类型的线程池的核心线程为0,总线程数128。 -
static final class UtilsThreadFactory extends AtomicLong implements ThreadFactory
线程工厂类,该类重写只是简单的创建了一个Thread返回
那么在这里我们基本知道了创建线程池的流程了,接下来,是我们调用该工具类时调用线程的整个流程是怎样的,通过断点调试,我们是最容易了解这个过程的,如图:
调用线程的整个流程
-
onClick
和testSingle
都是app上业务使用的方法 -
public static <T> void executeByCached(final BaseTask<T> baseTask)
关键代码是直接创建了一个任务BaseTask传递到线程池使用,那么我们看看BaseTask方法,已经针对run方面的代码进行了全部注释,更多的注释可以下载源码观看
@Override
public void run() {
// 判断是否循环计划内的
if (isSchedule) {
// 因为如果是在循环内的,那么runner还是之前的
if (runner == null) {
// 判断当前状态如果是New,便赋值state=RUNNING,如果不是New,便返回
if (!state.compareAndSet(NEW, RUNNING)) {
return;
}
// 获取当前线程
runner = Thread.currentThread();
if (mTimeoutListener != null) {
Log.w("ThreadUtils", "Scheduled task doesn't support timeout.");
}
} else {
// 如果不是RUNNING便直接返回
if (state.get() != RUNNING) {
return;
}
}
} else {
// 判断当前状态如果是New,便赋值state=RUNNING,如果不是New,便返回
if (!state.compareAndSet(NEW, RUNNING)) {
return;
}
// 获取当前线程
runner = Thread.currentThread();
if (mTimeoutListener != null) {
// 实例化 循环或延迟任务的线程池
mExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) Thread::new);
// 调用了延迟运行任务
mExecutorService.schedule(new TimerTask() {
@Override
public void run() {
if (!isDone() && mTimeoutListener != null) {
timeout();
mTimeoutListener.onTimeout();
}
}
}, mTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
try {
// 执行doInBackground方法获取值
final T result = doInBackground();
// 判断是否循环计划内的
if (isSchedule) {
// 如果不是RUNNING便直接返回
if (state.get() != RUNNING) {
return;
}
getDeliver().execute(() -> onSuccess(result));
} else {
// 判断当前状态如果是RUNNING,便赋值state=COMPLETING,如果不是RUNNING,便返回
if (!state.compareAndSet(RUNNING, COMPLETING)) {
return;
}
// 执行成功方法,getDeliver()已经封装了跳转ui线程
getDeliver().execute(() -> {
onSuccess(result);
onDone();
});
}
} catch (InterruptedException ignore) {
// 被中断了,判断当前状态如果是CANCELLED,便赋值state=INTERRUPTED
state.compareAndSet(CANCELLED, INTERRUPTED);
} catch (final Throwable throwable) {
// 如果出现异常了,判断当前状态如果是RUNNING,便赋值EXCEPTIONAL
if (!state.compareAndSet(RUNNING, EXCEPTIONAL)) {
return;
}
// 执行成功方法,getDeliver()已经封装了跳转ui线程
getDeliver().execute(() -> {
onFail(throwable);
onDone();
});
}
}
-
cancel
ThreadUtils的作者没明说Activity销毁是否要写cancel事件
从上面整个流程可以几个核心方法,我概括出来:
- 拥有者跟rxjava类似的doInBackground,onSucces等等方法
- 通过源码得知有延时、循环周期运行线程等方法
- 根据使用方式不一样来决定io,cpu线程数量,缓存时间,是否使用队列