AndroidAndroid开发Android开发

(3)线程系列 - 一起解读ThreadUtils源码

2021-05-24  本文已影响0人  zhongjh

开发第三方库的时候往往不会用到rxjava,会使用Thread来处理线程。那么就直接开门见山,让我们一步一步解读ThreadUtils源码。作者阅读源码习惯是先从public方法一个一个硬读下去,再配合使用方法来使用。

    /**
     * 根据类型获取线程池
     * @param type 类型
     * @param priority 优先级
     * @return 线程池
     */
    private static ExecutorService getPoolByTypeAndPriority(final int type, final int priority) {
        // 同步map安全,防止线程不安全创建多个 Map线程池
        synchronized (TYPE_PRIORITY_POOLS) {
            ExecutorService pool;
            // 通过类型获取 Map线程池
            Map<Integer, ExecutorService> priorityPools = TYPE_PRIORITY_POOLS.get(type);
            if (priorityPools == null) {
                // 如果没有 Map线程池 则新建一个
                priorityPools = new ConcurrentHashMap<>();
                pool = ThreadPoolExecutor4Util.createPool(type, priority);
                // 加入线程池
                priorityPools.put(priority, pool);
                // 新建后加入 Map线程池
                TYPE_PRIORITY_POOLS.put(type, priorityPools);
            } else {
                // 根据线程池优先级获取线程池
                pool = priorityPools.get(priority);
                // 如果没有该线程池,则创建新的线程池
                if (pool == null) {
                    pool = ThreadPoolExecutor4Util.createPool(type, priority);
                    priorityPools.put(priority, pool);
                }
            }
            return pool;
        }
    }

从上面代码可知Map TYPE_PRIORITY_POOLS里面包含了Map ExecutorService,
也就是说线程池包含进了两层map,最外的第一层是key为线程池类型的,里面的第二层是key为优先级的。

        /**
         * 创建线程池
         * @param type 类型
         * @param priority 优先级
         * @return 线程池
         */
        private static ExecutorService createPool(final int type, final int priority) {
            switch (type) {
                case TYPE_SINGLE:
                    // 创建 核心线程数为1,线程池最大线程数量为1,非核心线程空闲存活时长为0
                    // 只创建一个线程确保 顺序执行的场景,并且只有一个线程在执行
                    return new ThreadPoolExecutor4Util(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue4Util(),
                            new UtilsThreadFactory("single", priority)
                    );
                case TYPE_CACHED:
                    // 创建 核心线程数为0,线程池最大线程数量为128,非核心线程空闲存活时长为60秒
                    // 线程数为128个一般用于处理执行时间比较短的任务
                    return new ThreadPoolExecutor4Util(0, 128,
                            60L, TimeUnit.SECONDS,
                            new LinkedBlockingQueue4Util(true),
                            new UtilsThreadFactory("cached", priority)
                    );
                case TYPE_IO:
                    // 创建 核心线程数为可计算资源*2+1,线程池最大线程数量为可计算资源*2+1,非核心线程空闲存活时长为30秒
                    return new ThreadPoolExecutor4Util(2 * CPU_COUNT + 1, 2 * CPU_COUNT + 1,
                            30, TimeUnit.SECONDS,
                            new LinkedBlockingQueue4Util(),
                            new UtilsThreadFactory("io", priority)
                    );
                case TYPE_CPU:
                    // 创建 核心线程数为可计算资源+1,线程池最大线程数量为可计算资源*2+1,非核心线程空闲存活时长为30秒
                    return new ThreadPoolExecutor4Util(CPU_COUNT + 1, 2 * CPU_COUNT + 1,
                            30, TimeUnit.SECONDS,
                            new LinkedBlockingQueue4Util(true),
                            new UtilsThreadFactory("cpu", priority)
                    );
                default:
                    // 创建 核心线程数、线程池最大数量为自定义的,空闲存活时长为0
                    return new ThreadPoolExecutor4Util(type, type,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue4Util(),
                            new UtilsThreadFactory("fixed(" + type + ")", priority)
                    );
            }
        }

除了创建,还重写了afterExecute方法和execute方法,不过这两个方法暂时没什么意义所以先不管。

在线程系列第二节讲到了创建线程池需要的ThreadFactory threadFactory(线程工厂)、BlockingQueue workQueue(任务队列),该ThreadUtils类自定义了线程工厂和任务队列,所以我们先了解该两类

那么在这里我们基本知道了创建线程池的流程了,接下来,是我们调用该工具类时调用线程的整个流程是怎样的,通过断点调试,我们是最容易了解这个过程的,如图:


调用线程的整个流程
        @Override
        public void run() {
            // 判断是否循环计划内的
            if (isSchedule) {
                // 因为如果是在循环内的,那么runner还是之前的
                if (runner == null) {
                    // 判断当前状态如果是New,便赋值state=RUNNING,如果不是New,便返回
                    if (!state.compareAndSet(NEW, RUNNING)) {
                        return;
                    }
                    // 获取当前线程
                    runner = Thread.currentThread();
                    if (mTimeoutListener != null) {
                        Log.w("ThreadUtils", "Scheduled task doesn't support timeout.");
                    }
                } else {
                    // 如果不是RUNNING便直接返回
                    if (state.get() != RUNNING) {
                        return;
                    }
                }
            } else {
                // 判断当前状态如果是New,便赋值state=RUNNING,如果不是New,便返回
                if (!state.compareAndSet(NEW, RUNNING)) {
                    return;
                }
                // 获取当前线程
                runner = Thread.currentThread();
                if (mTimeoutListener != null) {
                    // 实例化 循环或延迟任务的线程池
                    mExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) Thread::new);
                    // 调用了延迟运行任务
                    mExecutorService.schedule(new TimerTask() {
                        @Override
                        public void run() {
                            if (!isDone() && mTimeoutListener != null) {
                                timeout();
                                mTimeoutListener.onTimeout();
                            }
                        }
                    }, mTimeoutMillis, TimeUnit.MILLISECONDS);
                }
            }
            try {
                // 执行doInBackground方法获取值
                final T result = doInBackground();
                // 判断是否循环计划内的
                if (isSchedule) {
                    // 如果不是RUNNING便直接返回
                    if (state.get() != RUNNING) {
                        return;
                    }
                    getDeliver().execute(() -> onSuccess(result));
                } else {
                    // 判断当前状态如果是RUNNING,便赋值state=COMPLETING,如果不是RUNNING,便返回
                    if (!state.compareAndSet(RUNNING, COMPLETING)) {
                        return;
                    }
                    // 执行成功方法,getDeliver()已经封装了跳转ui线程
                    getDeliver().execute(() -> {
                        onSuccess(result);
                        onDone();
                    });
                }
            } catch (InterruptedException ignore) {
                // 被中断了,判断当前状态如果是CANCELLED,便赋值state=INTERRUPTED
                state.compareAndSet(CANCELLED, INTERRUPTED);
            } catch (final Throwable throwable) {
                // 如果出现异常了,判断当前状态如果是RUNNING,便赋值EXCEPTIONAL
                if (!state.compareAndSet(RUNNING, EXCEPTIONAL)) {
                    return;
                }
                // 执行成功方法,getDeliver()已经封装了跳转ui线程
                getDeliver().execute(() -> {
                    onFail(throwable);
                    onDone();
                });
            }
        }

从上面整个流程可以几个核心方法,我概括出来:

  1. 拥有者跟rxjava类似的doInBackground,onSucces等等方法
  2. 通过源码得知有延时、循环周期运行线程等方法
  3. 根据使用方式不一样来决定io,cpu线程数量,缓存时间,是否使用队列
上一篇下一篇

猜你喜欢

热点阅读