Android开发Android源码之路Android开发

Android源码之路(二、AsyncTask)

2018-07-25  本文已影响5人  CodeInfo

参考

https://www.baidu.com/link?url=QNRznJEBT25k0bpgVD3bOniOia2W85eiPIWrS93YFknyrHoFDGrJVtoax2ZYpiiErtRW7VD-sNEgCRNespIhSK&wd=&eqid=9a27957100030dcd000000065aac77c0

https://www.jianshu.com/p/79cc3c5fc9a3

http://blog.csdn.net/majihua817/article/details/51658465

以下用法和源码分析基于android-27(即版本android 8.1.0)

要点总结:

1.进程所有AsyncTask默认并不是并行执行的,通常默认的 使用execute执行 的AsyncTask是排队按顺序执行的,同一时间内只有一个AsyncTask会运行;

2.进程中使用executeOnExecutor(Executor exec,Params...params)根据传入的线程池效果,可以实现同一时间多个AsyncTask异步执行的效果;

3.常用的回调方法有:onPreExecute()、doInBackground(Params... params)、onProgressUpdate(Progress... values)、onPostExecute(Result result)

4.执行过一个次或者正在执行的AsyncTask不能再次通过execute或者executeOnExecutor进行重复执行

5.使用executeOnExecutor并行执行的话,根据传入的线程池的设置会有队列大小的限制,通常直接使用AsyncTask中AsyncTask.THREAD_POOL_EXECUTOR这个线程池的话:

//cpu核数,假设4
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); 
//核心线程数,3
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//最大同时执行的线程数9   
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//空闲线程超过30秒被回收
private static final int KEEP_ALIVE_SECONDS = 30;               
//等待队列允许的最大数量为128
private static final BlockingQueue<Runnable> sPoolWorkQueue =new LinkedBlockingQueue<Runnable>(128);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
        sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;

(假设终端cpu数目为4)因为该线程池运行同时运行的最大线程数为9,等待队列最大为128,如果每一个asynctask的doInBackground中耗时非常久,一次性添加超过9+128=137个task的话(比如listview中一次性创建),那么将会引起异常java.util.concurrent.RejectedExecutionException

6.使用execute的话,因为有队列缓存(没大小限制,除非内存爆掉),串行执行,所以不存在线程池中任务超出大小的情况

7.AsyncTaskd的cancel方法只是简单把标志位改为true,最后使用Thread.interrupt去中断线程执行,但是这并不保证会马上停止执行,所以可以在doInBackground中使用isCancelled来判断任务是否被取消

要点解释

(来自参考资料https://www.jianshu.com/p/79cc3c5fc9a3)

在Android 1.5刚引入的时候,AsyncTask的execute是串行执行的;到了Android 1.6直到Android 2.3.2,又被修改为并行执行了,这个执行任务的线程池就是THREAD_POOL_EXECUTOR,因此在一个进程内,所有的AsyncTask都是并行执行的;但是在Android 3.0以后,如果你使用execute函数直接执行AsyncTask,那么这些任务是串行执行的。因此结论就来了:Android 3.0以上,AsyncTask默认并不是并行执行的。

由于同一个进程的AsyncTask都是在一个线程池中,如果默认是并行的,那么多个线程同时执行多个task中的doInBackground操作,而doInBackground又访问修改相同的资源,这边往往会忽略对资源的同步处理问题,经常会导致不可预料的错误结果,为此默认情况下使用execute执行的AsyncTask默认为串行执行;

如果处理好资源同步问题,可以使用executeOnExecutor传入合适的线程池,来达到AsyncTask并行执行的效果;

优缺点总结

优点:

缺点(参考http://blog.csdn.net/goodlixueyong/article/details/45895997):

使用方法

声明MyAsyncTask继承并实现AsyncTask相关回调:

    public class MyAsyncTask extends AsyncTask<String, Integer, Boolean>{
        String TAG="MyAsyncTask";
        public MyAsyncTask(String tag) {
            super();
            this.TAG=tag;
        }
        //doInBackground开始之前调用,ui线程,可用于界面上的一些初始化操作,比如弹出下载对话框
        @Override
        protected void onPreExecute() {
            Log.d(TAG,"onPreExecute "+Thread.currentThread());
            Toast.makeText(context,"onPreExecute",Toast.LENGTH_SHORT).show();
        }
        //用于执行后台任务,非ui线程,用于处理耗时任务,下载,数据处理等
        @Override
        protected Boolean doInBackground(String... params) {
            Log.d(TAG,"doInBackground "+Thread.currentThread());
            int count = params.length;
            for(int i=0;i<count;i++){
            //判断任务是否取消,取消则停止,用户调用AsyncTask.cancel()后设置该标志位
            //调用AsyncTask.cancel()并不能直接停止一个asynctask,
           // 需要在doInBackground/onProgressUpdate/onPreExecute手动判断该标志位
                if(isCancelled()){
                    return false
                }
                String p = params[i];
                Log.d(TAG,"doInBackground:"+p);
                publishProgress(i);
            }
            return true;
        }

        //当后台任务doInBackground调用了publishProgress(Progress...)方法后,这个方法会被调用
        //用于执行处理过程中的进度更新之类,ui线程,可用于更新下载对话框的进度条
        @Override
        protected void onProgressUpdate(Integer... values) {
            Log.d(TAG,"onProgressUpdate:"+values[0]+"  "+Thread.currentThread());
            Toast.makeText(context,"onProgressUpdate:"+values[0],Toast.LENGTH_SHORT).show();
        }

        //当后台任务doInBackground执行完毕并通过return语句进行返回时,这个方法就会被调用
        //ui线程,可用于界面更新处理完成
        @Override
        protected void onPostExecute(Boolean aBoolean) {
            Log.d(TAG,"onPostExecute:"+aBoolean+"  "+Thread.currentThread());
            Toast.makeText(context,"onPostExecute",Toast.LENGTH_SHORT).show();
        }


        //AsyncTaskd的cancel方法只是简单把标志位改为true,最后使用Thread.interrupt去中断线程执行,但是这并不保证会马上停止执行,所以可以在doInBackground中使用isCancelled来判断任务是否被取消
        //ui线程,可用于取消后关闭ui界面提示等
        @Override
        protected void onCancelled(Boolean aBoolean) {
            Log.d(TAG,"onCancelled:"+aBoolean+"  "+Thread.currentThread());
        }

    }

创建相关实例并执行AsyncTask:

测试一:普通使用

    Log.d(TAG,"btn1 start");
     MyAsyncTask myAsyncTask =new MyAsyncTask("task1"); //创建AsyncTask子类实例
     myAsyncTask.execute("参数1","参数2");      //开始执行
     Log.d(TAG,"btn1 end");
    
    **日志输出**:
    btn1 start                              
    onPreExecute Thread[main,5,main]      //可以看见调用execute后,onPreExecute立马执行
    btn1 end
    doInBackground Thread[AsyncTask #1,5,main]   
    doInBackground:参数1
    doInBackground:参数2
    //doInBackground调用publishProgress后,由handler触发onProgressUpdate,与doInBackground为2个线程异步执行
    onProgressUpdate:0  Thread[main,5,main]  
    onProgressUpdate:1  Thread[main,5,main]
    onPostExecute:true  Thread[main,5,main] //全部操作完成后调用

测试二:多个asyncTask使用验证串行执行

    Log.d(TAG,"btn1 start");
    MyAsyncTask myAsyncTask =new MyAsyncTask("task1");
    MyAsyncTask myAsyncTask2 =new MyAsyncTask("task2");
    myAsyncTask.execute("参数1","参数2");
    myAsyncTask2.execute("参数1","参数2");
    Log.d(TAG,"btn1 end");
                    
    日志输出:
    btn1 start
    //可以看见各自调用execute后,task1,task2的onPreExecute立马执行,互不影响
    task1: onPreExecute Thread[main,5,main] 
    task2: onPreExecute Thread[main,5,main]
    btn1 end
    task1: doInBackground Thread[AsyncTask #1,5,main] //doInBackground是按序执行的
    task1: doInBackground:参数1
    task1: doInBackground:参数2
    task2: doInBackground Thread[AsyncTask #2,5,main]
    task2: doInBackground:参数1
    task2: doInBackground:参数2
    //onProgressUpdate和onProgressUpdate也是按序执行的
    //这边看到可能存在后续asyncTask执行doInBackground过程中,
    前面的task才刚触发onProgressUpdate和onPostExecute
    task1: onProgressUpdate:0  Thread[main,5,main]   
    task1: onProgressUpdate:1  Thread[main,5,main]  
    task1: onPostExecute:true  Thread[main,5,main]
    task2: onProgressUpdate:0  Thread[main,5,main]
    task2: onProgressUpdate:1  Thread[main,5,main]
    task2: onPostExecute:true  Thread[main,5,main]

测试三:并行执行:

MyAsyncTask myAsyncTask =new MyAsyncTask("task1");
    MyAsyncTask myAsyncTask2 =new MyAsyncTask("task2");
    myAsyncTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR,"参数1","参数2");
    myAsyncTask2.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR,"参数1","参数2");
    日志输出(doInBackground 添加Thread.sleep方便模拟):
    btn1 start
    task1: onPreExecute Thread[main,5,main]
    task2: onPreExecute Thread[main,5,main]
    //task1与task2的doInBackground部分交替执行
    task1: doInBackground Thread[AsyncTask #1,5,main]
    task1: doInBackground:参数1                             
    task2: doInBackground Thread[AsyncTask #2,5,main]
    task2: doInBackground:参数1                             
    btn1 end
    task1: onProgressUpdate:0  Thread[main,5,main]
    task2: onProgressUpdate:0  Thread[main,5,main]
    task1: doInBackground:参数2
    task2: doInBackground:参数2
    task1: onProgressUpdate:1  Thread[main,5,main]
    task2: onProgressUpdate:1  Thread[main,5,main]
    task1: onPostExecute:true  Thread[main,5,main]
    task2: onPostExecute:true  Thread[main,5,main]

从测试中可以看出onPreExecute在各自AsyncTask调用execute后立即执行,后续源码分析可知道onPreExecute不由handler触发,否则可能与doInBackground执行时序有差别;

测试二中:
后续从源码分析可以知道使用execute触发的AsyncTask中的doInBackground部分实际上是在线程池中排队执行的,所以只有一个任务的doInBackground执行完,才会获取下一个任务的doInBackground进行执行;

从源码分析可以知道onProgressUpdate、onPostExecute是通过handler消息控制的,本身也是按序执行的;多个AsyncTask的不同的生命周期步骤其实会有穿插;

从日志输出和源码原理可知,AsyncTask的按序执行相对来讲有一定的不足,不会等待一个AsyncTask完整执行完后onPreExecute -> doInBackground--> (onProgressUpdate)->onPostExecute,后下一个AsyncTask才允许开始它的生命周期;

测试三中:
验证了执行线程池并行执行的效果


源码分析

AsyncTask开始执行工作的入口函数为AsyncTask.execute,所以从该方法入手查看

AsyncTask源码

execute函数

@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

注解上也表明该方法运行需要在ui线程运行,为什么需要呢?此为问题一;

这边使用execute触发会使用默认的线程池SerialExecutor sDefaultExecutor,
该线程池实际上只起到一个队列控制,保存task依次按顺序执行;
假设需要并发执行AsyncTask,则可以直接使用executeOnExecutor方法
传入合适的线程池就可以达到效果;

接着查看调用的executeOnExecutor所执行的操作
executeOnExecutor函数
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {                          
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;

    onPreExecute();                                         

    mWorker.mParams = params;
    exec.execute(mFuture);

    return this;
}
    mWorker.mParams = params;
    exec.execute(mFuture);

然后将传入的params参数赋值给mWorker.mParams(params后续会传递给doInBackground进行处理),调用线程池exec处理mFuture;

这边需要对mWorker、mFuture和线程池exec进行分析,首先对于exec可以看出传入的是线程池SerialExecutor

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

对于线程池SerialExecutor

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

//实际执行任务的线程池,注意是静态的,所以AsyncTask共用
public static final Executor THREAD_POOL_EXECUTOR;
static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
    
private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();        //即调用 FutureTask mFuture.run()方法    
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
            //THREAD_POOL_EXECUTOR为实际执行任务的线程池
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

可以看出该SerialExecutor在被调用execute后,会将一个任务添加到队列ArrayDeque中(即前面exec.execute(mFuture)),execute传入的是实现了Runnable接口的类FutureTask的实例mFuture,SerialExecutor的主要作用实际上时维护任务的队列,按顺序取出任务交给实际执行任务的线程池THREAD_POOL_EXECUTOR

第一个执行时mActive == null所以会调用scheduleNext,(按先进先出)从队列中拿出一个任务交给线程池THREAD_POOL_EXECUTOR进行执行:

  public void run() {
        try {
            r.run();        //即调用 FutureTask mFuture.run()方法    
        } finally {
            scheduleNext(); //执行完毕后,获取下一个任务进行处理
        }
    }

执行的即为传入的mFuture.run()方法,执行完成后会再次调用scheduleNext获取并执行下一个AsyncTask任务。
下面看一下FutureTask.run()方法所做的操作

FutureTask类

//FutureTask类构造函数,这边传入的实际上是实现了Callable接口的WorkerRunnable类对象mWorker
//mWorker与mFuture的关联在AsyncTask创建时构造函数中完成
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

//线程池THREAD_POOL_EXECUTOR中执行的方法
public void run() {
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;       
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();   //调用的是mWorker的call方法
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

由上述可知,线程池THREAD_POOL_EXECUTOR中执行的mFuture.run()方法中实际上会调用result = c.call()即实现了Callable接口的类的WorkerRunnable对象mWorker.call();

其中WorkerRunnable类对象mWorker、FutureTask类对象mFuture的创建和关联位于AsyncTask构造函数中;

AsyncTask构造函数

    //部分需要联系使用的成员变量
    //注意是静态的,类共享一个线程池
    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    //注意是静态的变量,类共享一个handler
    private static InternalHandler sHandler;                        

    private final WorkerRunnable<Params, Result> mWorker;
    private final FutureTask<Result> mFuture;
    private final Handler mHandler;
    
    /**
     * Creates a new asynchronous task. This constructor must be invoked on the UI thread.
     */
    public AsyncTask() {
        this((Looper) null);
    }

    /**
     * Creates a new asynchronous task. This constructor must be invoked on the UI thread.
     *
     * @hide
     */
    public AsyncTask(@Nullable Handler handler) {
        this(handler != null ? handler.getLooper() : null);
    }

    /**
     * Creates a new asynchronous task. This constructor must be invoked on the UI thread.
     *
     * @hide
     */
    public AsyncTask(@Nullable Looper callbackLooper) {
        mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
            ? getMainHandler()
            : new Handler(callbackLooper);

        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    result = doInBackground(mParams); //线程池中最终执行doInBackground
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }

由上可以看出默认构造函数会执行

public AsyncTask(@Nullable Looper callbackLooper),传入参数为null,
此时mHandler = getMainHandler()获取到一个主线程的handler;
创建了mWorker和mFuture对象,并通过mFuture = new FutureTask<Result>(mWorker)
完成mFuture与mWorker的一个关联;

后续,线程池THREAD_POOL_EXECUTOR中执行的mFuture.run()会执行到mWorker.call()由上可看出:

    try {
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            //noinspection unchecked
            result = doInBackground(mParams); //线程池中最终执行doInBackground
            Binder.flushPendingCommands();
        } catch (Throwable tr) {
            mCancelled.set(true);
            throw tr;
        } finally {
            postResult(result);
        }

触发执行了doInBackground(mParams)后,通过postResult(result)将结果result通过hander发出最终在handler的ui线程中触发了onPostExecute回调;

postResult函数

private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }
使用handler发送处理结束的消息

getMainHandler函数

private static Handler getMainHandler() {
    synchronized (AsyncTask.class) {
        if (sHandler == null) {
            sHandler = new InternalHandler(Looper.getMainLooper());
        }
        return sHandler;
    }
}  

该方法简单的创建一个handler通过Looper.getMainLooper()使得该handler运行于ui线程,注意的是sHandler是静态的,也就是说进程所有的AsyncTask共享一个handler进行处理消息;

InternalHandler类和finish方法

private static class InternalHandler extends Handler {
        public InternalHandler(Looper looper) {
            super(looper);
        }
    
        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
            //接收消息并且在ui线程回调执行finish最终调用onPostExecute方法
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
            //接收消息并且在ui线程回调执行onProgressUpdate方法
                case MESSAGE_POST_PROGRESS:    
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }
    private void finish(Result result) {
        if (isCancelled()) {
      //任务执行完之后,如果用户之前调用了AsyncTask.cancle()则触发onCancelled
            onCancelled(result);
        } else {
      //否则触发onPostExecute
            onPostExecute(result);  
        }
        mStatus = Status.FINISHED;
    }

源码总结

SerialExecutor implements Executor :    sDefaultExecutor
ThreadPoolExecutor :                    THREAD_POOL_EXECUTOR:
进程中对于使用execute开始的所有的AsyncTask顺序排队执行
  mWorker = new WorkerRunnable<Params, Result>(){..}..
  mFuture = new FutureTask<Result>(mWorker) {..}..

扫描关注微信公众号:


qrcode_for_gh_1bbc19ef669d_258.jpg
上一篇 下一篇

猜你喜欢

热点阅读