半栈工程师AndroidAndroid群英传学习之旅

妈妈再也不用担心你不会使用线程池了(ThreadUtils)

2018-05-23  本文已影响1629人  Blankj

为什么要用线程池

使用线程池管理线程有如下优点:

  1. 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池介绍

ThreadPoolExecutor

Java 为我们提供了 ThreadPoolExecutor 来创建一个线程池,其完整构造函数如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

线程池执行策略

当一个任务要被添加进线程池时,有以下四种执行策略:

  1. 线程数量未达到 corePoolSize,则新建一个线程(核心线程)执行任务。
  2. 线程数量达到了 corePoolsSize,则将任务移入队列等待。
  3. 队列已满,新建非核心线程执行任务。
  4. 队列已满,总线程数又达到了 maximumPoolSize,就会由 RejectedExecutionHandler 抛出异常。

其流程图如下所示:

image

常见的四类线程池

常见的四类线程池分别有 FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 和 CachedThreadPool,它们其实都是通过 ThreadPoolExecutor 创建的,其参数如下表所示:

线程池名称 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactory handler 适用场景
FixedThreadPool nThreads nThreads 0 MILLISECONDS LinkedBlockingQueue defaultThreadFactory defaultHandler 已知并发压力的情况下,对线程数做限制
SingleThreadExecutor 1 1 0 MILLISECONDS LinkedBlockingQueue defaultThreadFactory defaultHandler 需要保证顺序执行的场景,并且只有一个线程在执行
ScheduledThreadPool corePoolSize Integer.MAX_VALUE 10 MILLISECONDS DelayedWorkQueue defaultThreadFactory defaultHandler 需要多个后台线程执行周期任务的场景
CachedThreadPool 0 Integer.MAX_VALUE 60 SECONDS SynchronousQueue defaultThreadFactory defaultHandler 处理执行时间比较短的任务

如果你不想自己写一个线程池,那么你可以从上面看看有没有符合你要求的(一般都够用了),如果有,那么很好你直接用就行了,如果没有,那你就老老实实自己去写一个吧。

合理地配置线程池

需要针对具体情况而具体处理,不同的任务类别应采用不同规模的线程池,任务类别可划分为 CPU 密集型任务、IO 密集型任务和混合型任务。

线程池工具类封装及使用

为了提升开发效率及更好地使用和管理线程池,我已经为你们封装好了线程工具类----ThreadUtils,依赖 AndroidUtilCode 1.16.1 版本即可使用,其 API 如下所示:

isMainThread            : 判断当前是否主线程
getFixedPool            : 获取固定线程池
getSinglePool           : 获取单线程池
getCachedPool           : 获取缓冲线程池
getIoPool               : 获取 IO 线程池
getCpuPool              : 获取 CPU 线程池
executeByFixed          : 在固定线程池执行任务
executeByFixedWithDelay : 在固定线程池延时执行任务
executeByFixedAtFixRate : 在固定线程池按固定频率执行任务
executeBySingle         : 在单线程池执行任务
executeBySingleWithDelay: 在单线程池延时执行任务
executeBySingleAtFixRate: 在单线程池按固定频率执行任务
executeByCached         : 在缓冲线程池执行任务
executeByCachedWithDelay: 在缓冲线程池延时执行任务
executeByCachedAtFixRate: 在缓冲线程池按固定频率执行任务
executeByIo             : 在 IO 线程池执行任务
executeByIoWithDelay    : 在 IO 线程池延时执行任务
executeByIoAtFixRate    : 在 IO 线程池按固定频率执行任务
executeByCpu            : 在 CPU 线程池执行任务
executeByCpuWithDelay   : 在 CPU 线程池延时执行任务
executeByCpuAtFixRate   : 在 CPU 线程池按固定频率执行任务
executeByCustom         : 在自定义线程池执行任务
executeByCustomWithDelay: 在自定义线程池延时执行任务
executeByCustomAtFixRate: 在自定义线程池按固定频率执行任务
cancel                  : 取消任务的执行

如果你使用 RxJava 很 6,而且项目中已经使用了 RxJava,那么你可以继续使用 RxJava 来做线程切换的操作;如果你并不会 RxJava 或者是在开发 SDK,那么这个工具类再适合你不过了,它可以为你统一管理线程池的使用,不至于让你的项目中出现过多的线程池。

ThreadUtils 使用极为方便,看 API 即可明白相关意思,FixedPool、SinglePool、CachedPool 分别对应了上面介绍的 FixedThreadPool、SingleThreadExecutor、CachedThreadPool 这三种,IoPool 是创建 (CPU_COUNT * 2 + 1) 个核心线程数,CpuPool 是建立 (CPU_COUNT + 1) 个核心线程数;而所有的 execute 都是线程池外围裹了一层 ScheduledThreadPool,这里和 RxJava 线程池的实现有所相似,可以更方便地提供延时任务和固定频率执行的任务,当然也可以更方便地取消任务的执行,下面让我们来简单地来介绍其使用,以从 assets 中拷贝 APK 到 SD 卡为例,其代码如下所示:

public static void releaseInstallApk(final OnReleasedListener listener) {
    if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
        ThreadUtils.executeByIo(new ThreadUtils.SimpleTask<Void>() {
            @Override
            public Void doInBackground() throws Throwable {
                ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
                return null;
            }

            @Override
            public void onSuccess(Void result) {
                if (listener != null) {
                    listener.onReleased();
                }
            }
        });
    } else {
        if (listener != null) {
            listener.onReleased();
        }
        LogUtils.d("test apk existed.");
    }
}

看起来还不是很优雅是吧,你可以把相关的 Task 都抽出来放到合适的包下,这样每个 Task 的指责一看便知,如上例子可以改装成如下所示:

public class ReleaseInstallApkTask extends ThreadUtils.SimpleTask<Void> {

    private OnReleasedListener mListener;

    public ReleaseInstallApkTask(final OnReleasedListener listener) {
        mListener = listener;
    }

    @Override
    public Void doInBackground() throws Throwable {
        ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
        return null;
    }

    @Override
    public void onSuccess(Void result) {
        if (mListener != null) {
            mListener.onReleased();
        }
    }

    public void execute() {
        ThreadUtils.executeByIo(this);
    }
}

public static void releaseInstallApk(final OnReleasedListener listener) {
    if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
        new ReleaseInstallApkTask(listener).execute();
    } else {
        if (listener != null) {
            listener.onReleased();
        }
        LogUtils.d("test apk existed.");
    }
}

是不是瞬间清爽了很多,如果执行成功的回调中涉及了 View 相关的操作,那么你需要在 destroy 中取消 task 的执行哦,否则会内存泄漏哦,继续你上面的例子为例,代码如下所示:

public class XXActivity extends Activity {
    ···
    
    @Override
    protected void onDestroy() {
        // ThreadUtils.cancel(releaseInstallApkTask); 或者下面的取消都可以
        releaseInstallApkTask.cancel();
        super.onDestroy();
    }
}

以上是以 SimpleTask 为例,Task 的话会多两个回调,onCancel() 和 onFail(Throwable t),它们和 onSuccess(T result) 都是互斥的,最终回调只会走它们其中之一,并且在 Android 端是发送到主线程中执行,如果是 Java 端的话那就还是会在相应的线程池中执行,这点也方便了我做单元测试。

线程池工具类单元测试

如果遇到了异步的单测,你会发现单测很快就跑完呢,并没有等待我们线程跑完再结束,我们可以用 CountDownLatch 来等待线程的结束,或者化异步为同步的做法,这里我们使用 CountDownLatch 来实现,我进行了简单的封装,测试 Fixed 的代码如下所示:

public class ThreadUtilsTest {

    @Test
    public void executeByFixed() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestTask<String> task = new TestTask<String>(latch) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500 + index * 10);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixed(3, task);
            }
        });
    }

    @Test
    public void executeByFixedWithDelay() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestTask<String> task = new TestTask<String>(latch) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixedWithDelay(3, task, 500 + index * 10, TimeUnit.MILLISECONDS);
            }
        });
    }

    @Test
    public void executeByFixedAtFixRate() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestScheduledTask<String> task = new TestScheduledTask<String>(latch, 3) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500 + index * 10);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixedAtFixRate(3, task, 3000 + index * 10, TimeUnit.MILLISECONDS);
            }
        });
    }

    abstract static class TestScheduledTask<T> extends ThreadUtils.Task<T> {

        private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
        private int mTimes;
        CountDownLatch mLatch;

        TestScheduledTask(final CountDownLatch latch, final int times) {
            mLatch = latch;
            mTimes = times;
        }

        abstract void onTestSuccess(T result);

        @Override
        public void onSuccess(T result) {
            onTestSuccess(result);
            if (ATOMIC_INTEGER.addAndGet(1) % mTimes == 0) {
                mLatch.countDown();
            }
        }

        @Override
        public void onCancel() {
            System.out.println(Thread.currentThread() + " onCancel: ");
            mLatch.countDown();
        }

        @Override
        public void onFail(Throwable t) {
            System.out.println(Thread.currentThread() + " onFail: " + t);
            mLatch.countDown();
        }
    }

    abstract static class TestTask<T> extends ThreadUtils.Task<T> {
        CountDownLatch mLatch;

        TestTask(final CountDownLatch latch) {
            mLatch = latch;
        }

        abstract void onTestSuccess(T result);

        @Override
        public void onSuccess(T result) {
            onTestSuccess(result);
            mLatch.countDown();
        }

        @Override
        public void onCancel() {
            System.out.println(Thread.currentThread() + " onCancel: ");
            mLatch.countDown();
        }

        @Override
        public void onFail(Throwable t) {
            System.out.println(Thread.currentThread() + " onFail: " + t);
            mLatch.countDown();
        }
    }

    <T> void asyncTest(int threadCount, TestRunnable<T> runnable) throws Exception {
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            runnable.run(i, latch);
        }
        latch.await();
    }

    interface TestRunnable<T> {
        void run(final int index, CountDownLatch latch);
    }
}

最后想说的话

感谢大家一起陪伴 AndroidUtilCode 的成长,核心工具类几乎都已囊括,也是汇集了我大量的心血,把开源做到了极致,希望大家可以用的舒心,大大提升开发效率,早日赢取白富美,走上人生巅峰。

后文再添加一个个人对 OkHttp 的线程池的使用分析,算是送上个小福利。

OkHttp 中的线程池使用

查看 OkHttp 的源码发现,不论是同步请求还是异步请求,最终都是交给 Dispatcher 做处理,我们看下该类和线程池有关的的主要代码:

public final class Dispatcher {
  // 最大请求数
  private int maxRequests = 64;
  // 相同 host 最大请求数
  private int maxRequestsPerHost = 5;
  // 请求执行线程池,懒加载
  private @Nullable ExecutorService executorService;
  // 就绪状态的异步请求队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 运行中的异步请求队列,包括还没完成的请求
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
      this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
      if (executorService == null) {
          // 和 CachedThreadPool 很相似
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
      }
      return executorService;
  }

  synchronized void enqueue(AsyncCall call) {
    // 不超过最大请求数并且不超过 host 最大请求数
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 添加到运行中的异步请求队列
      runningAsyncCalls.add(call);
      // 添加到线程池中运行
      executorService().execute(call);
    } else {
      // 添加到就绪的异步请求队列
      readyAsyncCalls.add(call);
    }
  }

  // 当该异步请求结束的时候,会调用此方法,用于将运行中的异步请求队列中的该请求移除并调整请求队列
  // 此时就绪队列中的请求就可以进入运行中的队列
  void finished(AsyncCall call) {
      finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
      int runningCallsCount;
      Runnable idleCallback;
      synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
      }

      if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
      }
  }

  // 根据 maxRequests 和 maxRequestsPerHost 来调整 runningAsyncCalls 和 readyAsyncCalls
  // 使运行中的异步请求不超过两种最大值,并且如果队列有空闲,将就绪状态的请求归类为运行中。
  private void promoteCalls() {
    // 如果运行中的异步队列不小于最大请求数,直接返回
    if (runningAsyncCalls.size() >= maxRequests) return;
    // 如果就绪队列为空,直接返回
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    // 遍历就绪队列并插入到运行队列
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
      // 运行队列中的数量到达最大请求数,直接返回
      if (runningAsyncCalls.size() >= maxRequests) return;
    }
  }
}

可以发现 OkHttp 不是在线程池中维护线程的个数,线程是通过 Dispatcher 间接控制,线程池中的请求都是运行中的请求,这也就是说线程的重用不是线程池控制的,通过源码我们发现线程重用的地方是请求结束的地方 finished(AsyncCall call) ,而真正的控制是通过 promoteCalls 方法, 根据 maxRequestsmaxRequestsPerHost 来调整 runningAsyncCallsreadyAsyncCalls,使运行中的异步请求不超过两种最大值,并且如果队列有空闲,将就绪状态的请求归类为运行中。

上一篇下一篇

猜你喜欢

热点阅读