AsyncTask

2020-04-26  本文已影响0人  G_Freedom

AsyncTask是什么?

AsyncTask是Google推出的轻量级的异步处理工具,本质上就是一个封装了线程池和handler的异步框架。

AsyncTask的用处?

实现多线程,在工作线程中处理耗时任务同时会将工作线程中的结果传递到UI线程,不需要主动去创建和操作线程。避免了对线程的维护和管理。

简单用法

// 三个泛型参数分别代表传入的参数类型,任务执行过程需要更新的数据类型,
//任务执行结束返回的结果类型,如果无类型则可以用Void类
AsyncTask<Integer,Integer,Void> asyncTask = new AsyncTask<Integer, Integer, Void>() {
    /**
     * 得到结果,在主线程执行
     * @param aVoid
     */
    @Override
    protected void onPostExecute(Void aVoid) {
        Log.d(TAG, "onPostExecute: >>>");
        super.onPostExecute(aVoid);
    }

    /**
     * 任务内容,在工作线程执行
     * @param integers
     * @return
     */
    @Override
    protected Void doInBackground(Integer... integers) {
        Log.d(TAG, "doInBackground: >>>params: "+Arrays.toString(integers));
        return null;
    }

    /**
     * 任务执行前,在主线程执行
     */
    @Override
    protected void onPreExecute() {
        Log.d(TAG, "onPreExecute: >>");
        super.onPreExecute();
    }

    /**
     * 任务已取消(带结果),在主线程执行
     * @param aVoid
     */
    @Override
    protected void onCancelled(Void aVoid) {
        super.onCancelled(aVoid);
        Log.d(TAG, "onCancelled(有参): >>>");
    }

    /**
     * 任务已取消,在主线程执行
     */
    @Override
    protected void onCancelled() {
        super.onCancelled();
        Log.d(TAG, "onCancelled: >>>");
    }

    /**
     * 指定过程更新的数据,在主线程执行
     * @param values
     */
    @Override
    protected void onProgressUpdate(Integer... values) {
        super.onProgressUpdate(values);
        Log.d(TAG, "onProgressUpdate: "+ Arrays.toString(values));
    }
};
asyncTask.execute(2,3,1);

AsyncTask的优缺点?

优点
1、方便实现异步通信,不需要 Thread + Handler 复杂组合,同时节省资源,内部采用线程池的缓存 + 复用模式,避免了频繁创建和销毁线程带来的系统资源开销。
2、处理单个异步任务简单,可以获取到异步任务的进度。
3、可以通过cancel方法取消还没执行完的AsyncTask。
缺点
1、同时处理多个异步任务会降低效率。
2、如果在列表中大量创建和使用AsyncTask,可能会出现因为异步任务不能尽快完成任务导致一直在新开线程去添加新的任务,会造成滑动不流畅,当开启的线程过多就会引起崩溃问题。
3、如果使用AsyncTask去处理大量的异步任务并且使用的是异步线程池会有可能引起崩溃。崩溃异常:RejectedExecutionException

创建并执行过多的AsyncTask导致创建大量的工作线程崩溃

AsyncTask中有一个全局静态的线程池 -> SDK 28

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

线程池的配置:

//当前虚拟机可用处理器数量
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
//我们希望在核心池中至少有2个线程,最多4个线程,宁愿有1小于CPU计数,以避免饱和
//具有后台工作的CPU
//核心线程数量,取决于获取到的CPU核数,最多4个线程数量
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//最大可用线程数,可用处理器数量的两倍+1
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//空闲线程超时时间
private static final int KEEP_ALIVE_SECONDS = 30;
//线程任务队列容量128
private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);

当一个任务通过execute方法把任务加入到线程池时;这是ThreadPoolExecutor特性;

//ThreadPoolExecutor添加任务
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 如果运行的线程小于corePoolSize,请尝试执行以下操作使用给定的命令启动一个新线程
     * 的任务。对addWorker的调用将自动检查runState和workerCount,这样可以防止添加错误警报
     * 当它不应该线程,通过返回false。
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 如果一个任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程
     * (因为上次检查后现有的已经死了)进入此方法后,池关闭。所以我们重新检查状态,
     * 如果需要,回滚排队停止,或者如果没有新线程,则启动新线程
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 如果无法对任务排队,则尝试添加新线程。
     * 如果它失败了,我们知道我们被关闭或饱和,因此拒绝任务。
     */
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3 -> 如果缓存数量超过了128,会抛出异常rejectedExecution
    else if (!addWorker(command, false))
        reject(command);
}

//异常
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务(一般为抛出java.util.concurrent.RejectedExecutionException异常)。
ThreadPoolExecutor这个线程池都是进程范围共享的,static的,所以当任务线程超过了缓存队列的容量时也就是128个缓存数的情况下会抛出异常rejectedExecution。这个前提是使用异步线程池。不过从Android3.0开始AsyncTask默认使用的是同步线程池串行执行,不会出现这个问题。每一个AsyncTask一次只能执行一个任务,也就是execute只能调用一次,在任务结束之前不能再次调用,要么就会报错。
如果手动切到到异步线程池执行,那最多执行corePoolSize数量的线程任务,其他任务进入阻塞状态。

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }
...
}

同步线程池:

private static class SerialExecutor implements Executor {
    //使用一个集合来保证任务顺序执行,当存在多个AsyncTask的时候,就是串行执行
    //不能保证顺序
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

因为一次性只能执行corePoolSize(API 28 最大核心数量是4)数量的任务,其他的线程将进入阻塞状态,这将极大地降低执行效率,这也就是大家推荐使用AsyncTask只用来执行单个耗时任务的原因。
内存泄漏
如果你自定义一个AsyncTask并且传入了一个Activity的Context上下文环境,就很有可能引起这个问题,AsyncTask和Activity的生命周期无关,那么当finish后,AsyncTask依旧存在,而他持有着Activity的引用导致Activity无法被垃圾回收。
同时如果使用了非静态匿名内部类来实现AsyncTask,由于Java内部类的特点,他同样会持有当前Activity的引用造成内存泄漏。
cancel方法问题
传入的参数表示当前任务执行时是否可以取消。但是当你的doInBackground方法中执行一个循环或者一个IO流读写任务,即使你传入了true,改方法也无法取消这个任务的执行。区别在于调用这个方法后,doInBackground执行完成时会调用onCancelled方法,而不是onPostExecute方法,所以cancel无法保证任务能够被及时取消。

mWorker = new WorkerRunnable<Params, Result>() {
    public Result call() throws Exception {
        mTaskInvoked.set(true);
        Result result = null;
        try {
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            //noinspection unchecked
            //这个方法执行完后才回去判断是否取消任务的执行
            //所以cancel无法保证任务能够及时被取消
            result = doInBackground(mParams);
            Binder.flushPendingCommands();
        } catch (Throwable tr) {
            mCancelled.set(true);
            throw tr;
        } finally {
            //也就是只要执行到doInBackground那必然会执行到这个方法
            postResult(result);
        }
        return result;
    }
};
private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}
private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}
private void finish(Result result) {
    //这个时候只能在onPostExecute之前才能取消,不过任务都已经做完了,结果都拿到了
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

AsyncTask的原理?

一、创建AsyncTask

AsyncTask<String,Integer,Boolean> asyncTask = new AsyncTask();

构造方法

public AsyncTask(@Nullable Looper callbackLooper) {
    //创建一个主线程的Handler,用来把结果和进入返回到主线程的回调中
    mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
        ? getMainHandler()
        : new Handler(callbackLooper);
    //处理异步任务
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                //执行此回调
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };
    //一个可以取消的任务
    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occurred while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}

初始化线程池和相关配置

//当前虚拟机可用处理器数量
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
//我们希望在核心池中至少有2个线程,最多4个线程,宁愿有1小于CPU计数,以避免饱和
//具有后台工作的CPU
//核心线程数量,取决于获取到的CPU核数,最多4个线程数量
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//最大可用线程数,可用处理器数量的两倍+1
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//空闲线程超时时间 30s
private static final int KEEP_ALIVE_SECONDS = 30;
//线程任务队列容量128
private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);
      
/**
 * An {@link Executor} that can be used to execute tasks in parallel.
 * 可以用来并行执行任务的{@link Executor}。
 * 异步线程池,并行执行任务
 */
public static final Executor THREAD_POOL_EXECUTOR;  
static {
    //静态代码块初始化线程池并配置相关参数
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

/**
 * An {@link Executor} that executes tasks one at a time in serial
 * order.  This serialization is global to a particular process.
 * 同步线程池,串行执行任务,内部维护一个线程队列来保证顺序执行任务
 */
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

二、执行AsyncTask

//默认串行执行任务
asyncTask.execute("result");
or
//使用并行执行任务
asyncTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR,"result");

#AsyncTask
@MainThread -> 在主线程中执行
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

最终会执行到

@MainThread -> 在主线程中执行
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }
    mStatus = Status.RUNNING;
    //最开始会执行此方法回调,告诉我们任务执行前的准备工作。
    onPreExecute();
    mWorker.mParams = params;
    //通过此方法开启线程执行任务
    exec.execute(mFuture);
    return this;
}

可以看到一个AsyncTask一次只能执行一个任务,在任务结束之前如果再次调用execute或者executeOnExecutor方法会抛出异常。
exec -> 默认的是SerialExecutor,也可以主动设置THREAD_POOL_EXECUTOR -> ThreadPoolExecutor
mFuture -> FutureTask<WorkerRunnable> -> mFuture就是mWorker的一个包装类,它也具体实现了Future的一些任务操作接口,比如取消任务,mFuture重写了done()方法,应该是考虑到AsyncTask的get()和cancel()方法内部出异常时对结果的处理。
WorkerRunnable -> 这个就是真正执行异步任务的
默认执行的是串行任务

private static class SerialExecutor implements Executor {
    //使用一个集合来保证任务顺序执行,当存在多个AsyncTask的时候,就是串行执行
    //不能保证顺序
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        // 入队一个runnable,对原始的runnable加了点修饰
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    // 任务执行后,要检查是否有下一个runnable需要执行
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    //检查是否有下一个runnable需要执行,如果有,则交给另一个线程池执行
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            //最终通过此方法把任务放入到线程池中
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

并行任务就是直接使用调用放入到线程池中

THREAD_POOL_EXECUTOR.execute(mActive);
//也就是
executeOnExecutor(){
  //exec这就是THREAD_POOL_EXECUTOR
  exec.execute(mFuture);
}

线程池添加任务,根据执行规则分为以下几点:

//ThreadPoolExecutor添加任务
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 如果运行的线程小于corePoolSize,请尝试执行以下操作使用给定的命令启动一个新线程
     * 的任务。对addWorker的调用将自动检查runState和workerCount,这样可以防止添加错误警报
     * 当它不应该线程,通过返回false。
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 如果一个任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程
     * (因为上次检查后现有的已经死了)进入此方法后,池关闭。所以我们重新检查状态,
     * 如果需要,回滚排队停止,或者如果没有新线程,则启动新线程
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 如果无法对任务排队,则尝试添加新线程。
     * 如果它失败了,我们知道我们被关闭或饱和,因此拒绝任务。
     */
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3 -> 如果缓存数量超过了128,会抛出异常rejectedExecution
    else if (!addWorker(command, false))
        reject(command);
}

//异常
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

缓存池的容量最多128,如果大于此数量就会抛出异常java.util.concurrent.RejectedExecutionException拒绝再往里添加新的任务。
至此,任务开始执行

mWorker = new WorkerRunnable<Params, Result>() {
    public Result call() throws Exception {
        mTaskInvoked.set(true);
        Result result = null;
        try {
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            //noinspection unchecked
            result = doInBackground(mParams);
            Binder.flushPendingCommands();
        } catch (Throwable tr) {
            mCancelled.set(true);
            throw tr;
        } finally {
            postResult(result);
        }
        return result;
    }
};
//Handler调用此方法通知主线程任务执行完毕
private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

@Override
protected ReusableBitmap doInBackground(Void... params) {
    // enqueue the 'onDecodeBegin' signal on the main thread
    //执行异步任务的同时回调相关进度
    publishProgress();
    return decode();
}

@WorkerThread
protected final void publishProgress(Progress... values) {
    if (!isCancelled()) {
        //进度的回调会通过主线程的Handler发回给主线程
        getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                new AsyncTaskResult<Progress>(this, values)).sendToTarget();
    }
}

call()方法:首先mTaskInvoked是一个AtomicBoolean对象,它能保证线程安全地更新boolean值,mTaskInvoked.set(true)表示该AsyncTask对象已被调用。然后就是设置当前线程的优先级Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)。然后就是调用用户实现的doInBackground(mParams)方法。然后Binder.flushPendingCommands();是一个native方法,应该是重新调整线程优先级的。然后是捕获异常,最后是通过postResult(result)提交结果到主线程。
执行完毕

//Handler调用此方法通知主线程任务执行完毕
private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

静态内部类生成了Handler

private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                //通知任务结束
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                //通知进度
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}

private void finish(Result result) {
    if (isCancelled()) {
        //取消任务
        onCancelled(result);
    } else {
        //结束任务并获得结果
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

至此AsyncTask的执行到此为止。
可以发现AsyncTask中存在两个线程池:
ThreadPoolExecutor:用来执行任务
SerialExecutor:用来处理任务列表

AsyncTask的问题?

为什么用两个线程池?
因为AsyncTask是设计为串行执行任务的,所以另外最好需要一个线程池负责任务的排队。

上一篇下一篇

猜你喜欢

热点阅读