多线程,从解剖AsyncTask开始
常见使用方式
这里我以图片尺寸压缩任务来举个栗子:
/**
* 压缩
*
* @param fileInputPath 需要压缩图片SD卡路径
* @param compressCallback 压缩结果回调
*/
@MainThread
public void bitmapCompress(String fileInputPath, BitmapCompressCallback compressCallback) {
// BitmapCropTask继承并实现了AsyncTask的抽象方法doInBackground,执行异步任务进行压缩,压缩完回调compressCallback
new BitmapCropTask(compressCallback).execute(fileInputPath);
}
AsyncTask源码分析
public abstract class AsyncTask<Params, Progress, Result> {
......
@WorkerThread
protected abstract Result doInBackground(Params... params);
......
}
我们发现AsyncTask是一个泛型类,三个参数分别是:
- Params:doInBackground方法的入参,它是可变参数;
- Progress:AsyncTask所执行后台任务的进度;
- Result:后台任务的返回结果。
同时也是一个抽象类,具体业务需要继承AsyncTask并实现其doInBackground方法
首先我们看看构造器中发生了什么,
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);// 任务开始执行标记,原子操作
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);// 设置线程的优先级
//noinspection unchecked
result = doInBackground(mParams);// 任务执行返回result
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);// 任务执行过程发生异常,标记任务为取消,原子操作
throw tr;
} finally {
postResult(result);// 最终将执行结果post到Handler中,传递到目标主线程中
}
return result;// 后台任务执行的结果
}
};
mFuture = new FutureTask<Result>(mWorker) {// 将后台任务包装到FutureTask中
/**
* 任务结束时(正常执行结束或者取消),该回调函数会被触发
*/
@Override
protected void done() {
try {
postResultIfNotInvoked(get());// 调用get方法阻塞当前线程直到获取到任务执行结果
} catch (InterruptedException e) {// 如果任务线程执行时恰好cancel,即mayInterruptIfRunning为true
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {// 任务被取消,根本未执行
postResultIfNotInvoked(null);
}
}
};
}
我们注意到,构造器中实例化了两个对象,WorkerRunnable(Callable)对象和FutureTask对象。熟悉Java并发编程的人应该都知道这两种类型,下面具体分析:
Callable
Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其他线程执行的任务。
Callable的接口定义如下:
public interface Callable<V> {
V call() throws Exception;
}
Callable和Runnable的区别如下:
-
Callable定义的方法是call,而Runnable定义的方法是run。
-
Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
-
Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;// 对Callable对象的包装
this.state = NEW; // ensure visibility of callable
}
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// c就是被包装进FutureTask的mWorker对象,就是通过调起mWorker.call方法开始执行任务
ran = true;
} catch (Throwable ex) {// 后台任务内部执行异常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);// 设置后台任务执行结果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask实现的RunnableFuture接口,RunnableFuture兼具Runnable规范和Future规范,既可以执行任务,也可以获取任务的执行结果,支持取消任务。FutureTask帮助我们实现了具体的任务执行以及和Future接口中的get方法的关联。
任务执行
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
// 应该可以看到,不管上层有多少个AsyncTask执行execute,任务的执行都是落到一个全局静态的sDefaultExecutor去执行的
return executeOnExecutor(sDefaultExecutor, params);
}
execute方法内部调用了executeOnExecutor方法,并传入了一个sDefaultExecutor以及任务的入参,那么注意到这个静态全局sDefaultExecutor又是什么呢?
SerialExecutor
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();// 任务缓存队列,基于ArrayDeque,线程不安全,但速度快于LinkedList
Runnable mActive;// 当前执行的任务对象
/**
* 这里需要做同步控制,因为上层AsyncTask不明确是在什么线程中开始执行execute,因为缓存任务的ArrayDeque不是线程安全的,所以这个共享资源的访问需要做同步互斥的控制,包括后面的scheduleNext也是一样,因为任务的调度是在上一个任务执行的线程中开始调度
*/
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {// 将任务添加到ArrayDeque类型的任务队列中
public void run() {
try {
r.run();// 这里的Runnable就是mFuture
} finally {
// 当前任务在相应线程执行完成之后,开始调度其之后入队的任务开始运行
scheduleNext();
}
}
});
// 任务队列里的任务尚未开始被执行,则从此开始消耗任务队列
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {// 一直从队列头部获取任务,不为空就通过线程池去执行任务
THREAD_POOL_EXECUTOR.execute(mActive);// 交给自定义线程池去执行
}
}
}
SerialExecutor是一个多任务串行执行器,从ActivityThread源码中可以发现SERIAL_EXECUTOR 是Android3.1以后的默认任务执行器。SerialExecutor主要用于任务的排队,做到了先进先出的顺序;而真正的任务执行是THREAD_POOL_EXECUTOR。
ActivityThread.java
// If the app is Honeycomb MR1 or earlier, switch its AsyncTask
// implementation to use the pool executor. Normally, we use the
// serialized executor as the default. This has to happen in the
// main thread so the main looper is set right.
if (data.appInfo.targetSdkVersion <= android.os.Build.VERSION_CODES.HONEYCOMB_MR1) {
AsyncTask.setDefaultExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
}
THREAD_POOL_EXECUTOR
这个自定义的线程池配置参数如下:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 可用CPU数量
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));// 核心线程数量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;// 线程池大小
private static final int KEEP_ALIVE_SECONDS = 30;// 闲置时间
// 线程创建工厂
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
// 任务缓存队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);// 允许核心线程在闲置时间到达后释放
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
Android3.1及以下的系统版本,多个AsyncTask任务是通过THREAD_POOL_EXECUTOR并发执行的。继续深挖
// AsyncTask执行的三种状态
public enum Status {
/**
* Indicates that the task has not been executed yet.
*/
PENDING,// 挂起
/**
* Indicates that the task is running.
*/
RUNNING,// 执行
/**
* Indicates that {@link AsyncTask#onPostExecute} has finished.
*/
FINISHED,// 执行结束
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {// 正在执行或执行结束的任务不能重复调用execute
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;// 标识任务执行中
onPreExecute();// 回调子类实现的任务待开始之前的处理
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
至此,我们已经清晰的了解了AsyncTask中任务的执行过程。
那么,大家是否注意到了,真正提交给线程执行的竟然是FutureTask,为什么呢?根据上文的源码,不难发现,因为FutureTask同时实现了Runnable和Future接口,既能被线程执行,又能提供线程执行任务后的返回结果,还支持任务的取消。
现在,大家最关心的应该是任务的执行结果又是如何反馈到我们的主线程中的呢?我之前一篇文章中写过Android的消息机制Handler,显然就是通过Handler来实现的异步消息处理啦。
InternalHandler
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());// 主线程的Loope对象,关联了主线程及其消息队列
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:// 执行结束(正常执行over或者cancel掉)
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:// 更新进度
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
我们通过给Handler指定主线程的Looper对象,即关联上了主线程的消息循环系统,通过该handler发送的任何消息都会被递交到主线程的消息队列内。handler消息obj部分使用AsyncTaskResult类进行了封装,包含了当前的AsyncTask和传递的数据内容。
那么这两种消息都是什么时候发送的呢?细心的同学应该早就注意到了postResult方法,就是调用这个调用Handler向主线程发送的结果数据。
// 构造器中实例化的WorkerRunnable的call方法被调用(即任务被执行,一方面FutureTask的get可以获取执行结果,另外也通过postResult的方式将消息发送给handler)
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();// 将消息发送给目标Handler
return result;
}
我们注意到当handler获取到了执行结果后,调用了AsyncTask的finish方法,这个方法中发生什么了呢?
private void finish(Result result) {
if (isCancelled()) {// 如果任务被取消,回调onCancelled
onCancelled(result);
} else {// 任务正常执行结束,回调onPostExecute
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
其实从源码分析,如果任务已经执行了,调用cancel并不会真正的把任务结束,而是继续执行,只是改变的是执行之后的回调方法是 onPostExecute还是onCancelled。
最后我们再深入了解一下,任务是如何被取消的呢?
public final boolean cancel(boolean mayInterruptIfRunning) {// 是否停止当前正在运行的任务执行线程
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);// 通过我们的FutureTask对象的cancel方法,根据参数mayInterruptIfRunning决定内部是否直接interrupt当前线程
}
因为AsyncTask源码在Android版本迭代中发生很多变化,该次源码解剖基于Android 7.1.1。Android 3.0之前有一版本,3.0-4.1之间又有一版本,4.1之后的版本延续至今无大的改动变化。
虽然我们的AsyncTask可以实例化多个对象,但是内部的线程调度sHandler和THREAD_POOL_EXECUTOR(线程池)都属于类变量,被所有实例所共用。
补充Android-29更新(并不一定是在Android-29开始有的更新,再低的版本尚未探究过线程池这一块的改动)
1. 线程池超负荷了怎么办?
设置了拒绝策略,通过一个后备线程池去执行超额的任务,包括执行线程池相关参数都有些许变化如下:
// 执行线程池核心数调整为固定1,核心线程不设置超时
private static final int CORE_POOL_SIZE = 1;
// 执行线程池总量固定为20
private static final int MAXIMUM_POOL_SIZE = 20;
// 后备线程池核心即总量设置为5,核心线程也设置超时机制
private static final int BACKUP_POOL_SIZE = 5;
// 执行线程池和后备线程池超时时间都设置为3s
private static final int KEEP_ALIVE_SECONDS = 3;
sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
sBackupExecutor = new ThreadPoolExecutor(
BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
sBackupExecutor.allowCoreThreadTimeOut(true); // 允许核心线程超时
2. 执行线程池任务缓存上的调整(配合新增的后备线程池)
任务缓存队列由LinkedBlockingQueue(128)
调整为SynchronousQueue
,因为执行线程总量固定为20了,所有基本能hold住瞬时大量任务,线程尚未超过总量时,可以直接开启非核心线程处理,即便线程总量不够用,还有后备线程池,所以一定量的并发还是能撑住的。另外,没有太多任务时,只保留一个核心线程,也不需要后备线程池。
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), sThreadFactory);
threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}