线程池的原理和AsyncTask

2020-08-06  本文已影响0人  积跬步以致千里_ylc

线程池

1.什么是线程池?为什么要用线程池?

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。线程池就是将线程进行池化,需要运行任务时从池中拿一个线程来执行,执行完毕,线程放回池中。
在开发过程中,合理地使用线程池能够带来3个好处:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

2.JDK中的线程池和构造方法的参数解析

线程池的创建各个参数含义,这是线程池的初始方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

2.1 corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
2.2maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
2.3keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
2.4 TimeUnit keepAliveTime的时间单位
2.5 workQueue
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能

什么是阻塞队列 ?
常用阻塞队列

• ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
• LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
• PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
• DelayQueue:一个使用优先级队列实现的无界阻塞队列。
• SynchronousQueue:一个不存储元素的阻塞队列。
• LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
• LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
2.6 threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”
2.7 rejectedExecutionHandler(饱和策略)
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

3.线程池的工作机制

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

4.如何合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
•任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
•任务的优先级:高、中和低。
•任务的执行时间:长、中和短。
•任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。(Ncpu:当前cpu的核心数)

AsyncTask

1.为什么需要asyncTask

在Android当中,当一个应用程序的组件启动的时候,并且没有其他的应用程序组件在运行时,Android系统就会为该应用程序组件开辟一个新的线程来执行。默认的情况下,在一个相同Android应用程序当中,其里面的组件都是运行在同一个线程里面的,这个线程我们称之为Main线程。当我们通过某个组件来启动另一个组件的时候,这个时候默认都是在同一个线程当中完成的。
在Android当中,通常将线程分为两种,一种叫做Main Thread,除了Main Thread之外的线程都可称为Worker Thread。
当一个应用程序运行的时候,Android操作系统就会给该应用程序启动一个线程,这个线程就是我们的Main Thread,这个线程非常的重要,它主要用来加载我们的UI界面,完成系统和我们用户之间的交互,并将交互后的结果又展示给我们用户,所以Main Thread又被称为UI Thread。
Android系统默认不会给我们的应用程序组件创建一个额外的线程,所有的这些组件默认都是在同一个线程中运行。然而,某些时候当我们的应用程序需要完成一个耗时的操作的时候,例如访问网络或者是对数据库进行查询时,此时我们的UI Thread就会被阻塞。例如,当我们点击一个Button,然后希望其从网络中获取一些数据,如果此操作在UI Thread当中完成的话,当我们点击Button的时候,UI线程就会处于阻塞的状态,此时,我们的系统不会调度任何其它的事件,更糟糕的是,当我们的整个UI线程如果阻塞时间超过5秒钟(官方是这样说的),这个时候就会出现 ANR (Application Not Responding)的现象,此时,应用程序会弹出一个框,让用户选择是否退出该程序。对于Android开发来说,出现ANR的现象是绝对不能被允许的。
另外,由于我们的Android UI控件是线程不安全的,所以我们不能在UI Thread之外的线程当中对我们的UI控件进行操作。因此在Android的多线程编程当中,我们有两条非常重要的原则必须要遵守:
绝对不能在UI Thread当中进行耗时的操作,不能阻塞我们的UI Thread
不能在UI Thread之外的线程当中操纵我们的UI元素
既然在Android当中有两条重要的原则要遵守,那么我们可能就有疑问了?我们既不能在主线程当中处理耗时的操作,又不能在工作线程中来访问我们的UI控件,那么我们比如从网络中要下载一张图片,又怎么能将其更新到UI控件上呢?这就关系到了我们的主线程和工作线程之间的通信问题了。在Android当中,提供了两种方式来解决线程直接的通信问题,一种是通过Handler的机制,这个时候就很可能自己会去封装一下thread+handler了,正是因为这类需求很多,google就帮我们封装了 AsyncTask。

2.AsyncTask原理分析

AsyncTask是个abstract类,所以在使用时需要实现一个AsyncTask的具体实现类,一般来说会覆盖4个方法,我们以前面所说的从网络中下载一张图片,然后更新到UI控件来说明:
(1)onPreExecute():在执行后台下载操作之前调用,将下载等待动画显示出来,运行在主线程中;
(2)doInBackground():核心方法,执行后台下载操作的方法,必须实现的一个方法,运行在子线程中;这个方法是执行在子线程中的。在onPreExecute()执行完后,会立即开启这个方法。
(3)onProgressUpdate():在下载操作doInBackground()中调用publishProgress()时的回调方法,用于更新下载进度,运行在主线程中;
(4)onPostExecute():后台下载操作完成后调用,将下载等待动画进行隐藏,并更新UI,运行在主线程中,然后在主线程中使用AsyncTask:

  //tvProgressBar:显示进度的控件,ivShowBitmap:显示下载图片的控件,根据业务需求觉得构造参数。
  MyTestAsyncTask testAsyncTask = new MyTestAsyncTask(tvProgressBar,ivShowBitmap);
  testAsyncTask.execute("图片的下载地址");
 public AsyncTask(@Nullable Looper callbackLooper) {
        mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
            ? getMainHandler()
            : new Handler(callbackLooper);

        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }

mWorker代表了AsyncTask要执行的任务,是对Callable接口的封装,意味着这个任务是有返回值的

   private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }

mFuture代表了AsyncTask要执行的任务的返回结果,其实就是个FutureTask,安装FutureTask标准用法,mWorker作为Callable被传给了mFuture,那么mFuture的结果就从mWorker执行的任务中取得。仔细看mWorker,return语句返回的结果就是我们前面所说的doInBackground()的执行结果:


执行结果
private static final int CORE_POOL_SIZE = 1;
    private static final int MAXIMUM_POOL_SIZE = 20;
    private static final int BACKUP_POOL_SIZE = 5;
    private static final int KEEP_ALIVE_SECONDS = 3;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    // Used only for rejected executions.
    // Initialization protected by sRunOnSerialPolicy lock.
    private static ThreadPoolExecutor sBackupExecutor;
    private static LinkedBlockingQueue<Runnable> sBackupExecutorQueue;

    private static final RejectedExecutionHandler sRunOnSerialPolicy =
            new RejectedExecutionHandler() {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
            // As a last ditch fallback, run it on an executor with an unbounded queue.
            // Create this executor lazily, hopefully almost never.
            synchronized (this) {
                if (sBackupExecutor == null) {
                    sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
                    sBackupExecutor = new ThreadPoolExecutor(
                            BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
                            TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
                    sBackupExecutor.allowCoreThreadTimeOut(true);
                }
            }
            sBackupExecutor.execute(r);
        }
    };

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), sThreadFactory);
        threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

基本的初始化参数都被固定,根据实际情况再酌情调整。

private final Handler mHandler;
private Handler getHandler() {
        return mHandler;
}

private static Handler getMainHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler(Looper.getMainLooper());
            }
            return sHandler;
        }
    }
 private static class InternalHandler extends Handler {
        public InternalHandler(Looper looper) {
            super(looper);
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

在AsyncTask的构造函数中进行初始化:


mHandler初始化

总结:

  1. 线程池的创建:
    在创建了AsyncTask的时候,会默认创建两个线程池SerialExecutor和ThreadPoolExecutor,SerialExecutor负责将任务串行化,ThreadPoolExecutor是真正执行任务的地方,且无论有多少个AsyncTask实例,两个线程池都会只有一份。
  2. 任务的执行:
    在execute中,会执行run方法,当执行完run方法后,会调用scheduleNext()不断的从双端队列中轮询,获取下一个任务并继续放到一个子线程中执行,直到异步任务执行完毕。
  3. 消息的处理:
    在执行完onPreExecute()方法之后,执行了doInBackground()方法,然后就不断的发送请求获取数据;在这个AsyncTask中维护了一个InternalHandler的类,这个类是继承Handler的,获取的数据是通过handler进行处理和发送的。在其handleMessage方法中,将消息传递给onProgressUpdate()进行进度的更新,也就可以将结果发送到主线程中,进行界面的更新了。
  4. 使用AsyncTask的注意点
    通过观察代码我们可以发现,每一个new出的AsyncTask只能执行一次execute()方法,多次运行将会报错,如需多次,需要新new一个AsyncTask。


    execute()
    execute()调用的执行状态判断

AsyncTask和Handler优缺点

CAS 原子操作

什么是原子操作?如何实现原子操作?

CAS实现原子操作的三大问题

  1. ABA问题。因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。
    举个通俗点的例子,你倒了一杯饮料放桌子上,干了点别的事,然后同事把你的饮料喝了又给你重新倒了一杯饮料,你回来看饮料还在,拿起来就喝。如果你不管饮料在你离开的过程中间被人喝过,只关心饮料还在不在,这就是ABA问题。如果你是一个讲卫生讲文明的小伙子,不但关心饮料在不在,还要关心在你离开的时候饮料被人动过没有,因为你是程序员,所以就想起了放了张纸在旁边,写上初始值0,别人喝饮料前麻烦先做个累加才能喝饮料。
  2. 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
  3. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量A=3,C=X,合并一下AC=3X,然后用CAS来操作AC。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。

先抛出一个ABA问题的场景 以及运行结果

ABA问题

利用AtomicStampedReference解决这个问题的,代码如下:

 public static void main(String[] args) {
        AtomicStampedReference<Integer> stamped = new AtomicStampedReference<>(10, 1);
        new Thread(() -> {
            int stamp1 = stamped.getStamp();
            System.out.println(Thread.currentThread().getName() + "--第1次版本号:" + stamp1);
            stamped.compareAndSet(10, 16, stamp1, stamp1 + 1);

            int stamp2 = stamped.getStamp();
            System.out.println(Thread.currentThread().getName() + "--第2次版本号:" + stamp2);
            stamped.compareAndSet(16, 10, stamp2, stamp2 + 1);

            int stamp3 = stamped.getStamp();
            System.out.println(Thread.currentThread().getName() + "--第3次版本号:" + stamp3);
        }, "钱罐罐").start();

        new Thread(() -> {
            try {
                int stamp1 = stamped.getStamp();
                System.out.println(Thread.currentThread().getName() + "--第1次版本号:" + stamp1);
                TimeUnit.SECONDS.sleep(2);
                boolean isSuccess = stamped.compareAndSet(10, 18,
                        stamped.getStamp(), stamped.getStamp() + 1);
//                boolean isSuccess = stamped.compareAndSet(10, 18, 1, stamped.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + "--修改是否成功:" + isSuccess
                        + ",当前版本:" + stamped.getStamp() + ",当前实际值:" + stamped.getReference());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "王麻子").start();
    }

结果

image.png
compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)。
(1)第一个参数expectedReference:表示预期值。
(2)第二个参数newReference:表示要更新的值。
(3)第三个参数expectedStamp:表示预期的标记戳(版本号)。
(4)第四个参数newStamp:表示要更新的标记戳。
其实除了AtomicStampedReference类,还有一个原子类也可以解决,就是AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,用法没有AtomicStampedReference灵活。因此也只是在特定的场景下使用。
上一篇下一篇

猜你喜欢

热点阅读