android AsyncTask 源码分析

2021-05-27  本文已影响0人  Yapple
使用

AsyncTask的使用不是本文的重点,我相信读者也不是来了解这个的,所以就先简单的介绍下它的使用,帮助大家回忆。
AsyncTask的实现:

public class DownloadTask extends AsyncTask<Object, Integer, Boolean> {

    /**
     * 在执行任务之前调用,运行在主线程,可用来做布局更新等
     */
    @Override
    protected void onPreExecute() {
        super.onPreExecute();
    }

    /**
     * 执行后台任务,运行在子线程
     */
    @Override
    protected Boolean doInBackground(Object... objects) {
        return false;
    }

    /**
     * 更新进度,运行在主线程,在doInBackground方法中调用publishProgress(Integer... values)会自动调用该方法
     */
    @Override
    protected void onProgressUpdate(Integer... values) {
        super.onProgressUpdate(values);
    }

    /**
     * 后台任务结束后调用,运行在主线程,参数aBoolean是doInBackground()方法的返回值
     */
    @Override
    protected void onPostExecute(Boolean aBoolean) {
        super.onPostExecute(aBoolean);
    }

    /**
     * 主动调用cancel()方法,或者doInBackground()方法抛出异常
     */
    @Override
    protected void onCancelled() {
        super.onCancelled();
    }
}

调用:

    DownloadTask downloadTask = new DownloadTask();
    downloadTask.execute(object1, object2);
    //这里的参数object1, object2最终会传给doInBackground(Object... objects);
源码思路

先说下整体的实现思路:创建线程池执行耗时任务,通过handler将进度、结果等送回主线程。

细节方面,我们需要研究下,该线程池是怎样的线程池,它的执行逻辑是怎样的:核心线程数,最大线程数,等待队列,拒绝策略等细节。顺便再了解下上面代码中的onPreExecute(),onProgressUpdate()等生命周期是在什么时候执行的。

开始研究

我们就从我们使用的地方开始下手:
DownloadTask downloadTask = new DownloadTask();
downloadTask.execute(object1, object2);

1.下面是其构造方法:
public AsyncTask(@Nullable Looper callbackLooper) {
        mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
            ? getMainHandler()
            : new Handler(callbackLooper);

        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                //设置标志,表示工作任务开始执行
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }

结构很简单,一共初始化了三个全局变量mHandler,mWorker,mFuture。

mHandler,从上面的代码看,是创建了一个绑定callbackLooper的handler,所以讲道理如果callbackLooper是另一个子线程的Looper对象,它也可是实现两个子线程之间的数据交互。但是,由于该构造方法被@hide隐藏了,我们使用的默认无参构造器,最终会使callbackLooper为null,也就是说mHandler赋值为getMainHandler()绑定了主线程。

下面是该handler实现的handleMessage方法,一个是处理结果,一个是处理进度的,这里就先不展开细说了,后边分析到工作线程执行到各个阶段时再细说。

        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }

mWorker,WorkerRunnable是抽象类,implements自Callable接口,对线程池框架比较熟悉的可以知道它将在子线程中调用执行,了解到这点就够了。

在mWorker实现的call()方法中最主要的一行代码doInBackground(mParams);它在这里执行了我们的工作任务,然后在catch异常的时候设置了标志位:mCancelled.set(true),最终在finally中执行了postResult()方法。postResult()的逻辑:

private Result postResult(Result result) {
        //getHandler()会返回mHandler,也就是上面介绍的绑定了主线程Looper对象的Handler
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }

这就和我们上面讲到的handleMessage对Message的处理联系了起来,最终会执行finish()方法,并在这里调用我们实现的onCancelled()或者onPostExecute()生命周期:

private void finish(Result result) {
        //isCancelled()方法会判断是否是取消导致,
        //比如上面说的执行mWorker.call()时catch异常和我们做活动调用AsyncTask的cancel()方法会使isCancelled()返回ture
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }

WorkerRunnable除了implements Callable接口还添加了个mParams数组,也就是doInBackground(mParams),执行时会传入的参数

private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
}

mFuture FutureTask本篇不做深入的了解,它的目的是获取子线程运算结果和提供子线程的中断方法。上面代码对于mFuture的初始化可以看到它将mWorker封装在了内部,所以以后的操作对象都变为了mFuture。mWork只是提供了call()方法(Callable接口)和参数mParams。

2.execute:

构造方法就是初始化了三个变量,mHandler,mWorker,mFuture。接下来我们看下execute()执行了哪些步奏:downloadTask.execute(object1, object2);

    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        //内部调用了executeOnExecutor方法
        return executeOnExecutor(sDefaultExecutor, params);
    }
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();
        //在这里将mWorker的mParams变量赋值
        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

可以看到execute()就是调用了executeOnExecutor()方法。executeOnExecutor()的前部分就是判断了当前AsyncTask的状态只有在Status.PENDING的状态才可以正常运行,并且在运行后立即将状态改为了Status.RUNNING,也就是说同一个AsyncTask对象只能execute()一次。

接着就是调用了onPreExecute();也就是我们执行任务前在主线程做一些初始化的操作,比如加载界面。

最后最重要的工作exec.execute(mFuture)来执行我们的工作任务。毋庸置疑,该方法最终会执行到mWorker的call()方法。我们现在要研究的就是这个Executor对象(即调用executeOnExecutor()时传入的sDefaultExecutor),它是怎样的线程池,我们传入的FutureTask对象将被如何调用。

sDefaultExecutor

终于我们要开始研究AsyncTask的核心内容了sDefaultExecutor(其实不是)。为什么说不是呢,因为sDefaultExecutor只是一个任务安排者,并不真正的创建线程执行任务。看下sDefaultExecutor的源码:

    /**
    *sDefaultExecutor最终会指向SerialExecutor对象
    **/
    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;
        //SerialExecutor并没有任何地方创建线程,execute也只是将任务放进ArrayDeque队列中
        //之前我们传入的是FutureTask对象,而这里需要的参数是Runnable对象是因为,    
        //FutureTask也实现了Runnable接口,并将Callable接口(即mWorker)的call()方法在run()方法中调用
        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                //THREAD_POOL_EXECUTOR才是最终执行任务的线程池
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

ok,我们先看下它是怎样安排任务的。1.将任务runnable稍作包装提交到mTasks中并插入到队列尾部。2.判断mActive是否为null,如果不为null则执行scheduleNext()。

第一次运行AsyncTask时肯定为null,所以会立即执行scheduleNext(),可以看到mTasks.poll()会从队列首部取出任务赋值给mActive,并且通过真正的线程池THREAD_POOL_EXECUTOR去执行。

当任务完成后会在finally中调用scheduleNext()方法去查看mTasks队列中是否有剩余任务,如果有则安排,没有则mActive置为null。保证下次创建新的AsyncTask时,mActive为正确值(因为sDefaultExecutor为static变量,全局只有一个sDefaultExecutor对象),这也表示如果短时间内创建多个AsyncTask,并且每个AsyncTask执行时间还比较长,那后执行AsyncTask.execute()方法的任务将需要等待前面的任务执行完成才能开始。也就是说AsyncTask其实是串行执行的。
若想并行执行也是有方法的,直接调用executeOnExecutor(THREAD_POOL_EXECUTOR, params);跳过sDefaultExecutor的代理。

THREAD_POOL_EXECUTOR

终于到了研究线程池的这一步了

/**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), sThreadFactory);
        threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

核心线程数:CORE_POOL_SIZE = 1,最大线程数:MAXIMUM_POOL_SIZE = 20,空闲线程持续时间KEEP_ALIVE_SECONDS = 3(秒),SynchronousQueue这是等待队列,线程工厂: sThreadFactory,最后设置了拒绝策略sRunOnSerialPolicy。
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列。也就是说最多同时执行20个线程任务。再有新的任务则会执行拒绝策略sRunOnSerialPolicy,我们看下sRunOnSerialPolicy的源码:

    private static final int BACKUP_POOL_SIZE = 5;
    private static final RejectedExecutionHandler sRunOnSerialPolicy =
            new RejectedExecutionHandler() {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
            // As a last ditch fallback, run it on an executor with an unbounded queue.
            // Create this executor lazily, hopefully almost never.
            synchronized (this) {
                if (sBackupExecutor == null) {
                    sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
                    sBackupExecutor = new ThreadPoolExecutor(
                            BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
                            TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
                    sBackupExecutor.allowCoreThreadTimeOut(true);
                }
            }
            sBackupExecutor.execute(r);
        }
    };

很简单,又创建了个备用线程池,只有BACKUP_POOL_SIZE个核心线程(5个),然后核心线程也会超时,毕竟备用线程池,逻辑上来讲需要也不需要长时间保留。sBackupExecutorQueue的容量是Integer.Max,所以说若短时间执行大量任务,则会将后续的任务都添加在sBackupExecutorQueue中,然后等待备用线程的调用,这时候,主线程池的即使结束了任务,也不会去执行新的任务,sBackupExecutorQueue中的任务将由备用线程池的5个线程去执行。这点上没有注意到可能会影响AsyncTask的使用效率:

如下图每隔3秒,子线程的任务完成后,只会有5个新的任务执行(log中的starttask) image.png

总结:

整体来说AsyncTask思路很简单,就是创建线程池,在子线程执行任务前调用onPreExecute(),在子线程中调用doInBackground(),然后执行过程中调用publishProgress()通过handler发送进度到主线程执行onProgressUpdate(),最后执行完成后再通过handler发送Result到主线程执行onPostExecute()或者onCancelled()
需要多注意的就是对于线程池要有一定的了解,否则读起源码来将有些困难。

上一篇下一篇

猜你喜欢

热点阅读