程序编码

Android 多线程探索(三)— 线程池

2018-02-27  本文已影响0人  Little丶Jerry

构建服务器应用程序的有效方法 — 线程池

为什么使用线程池?

每次通过 new Thread 创建线程并不是一种好的方式,每次 new Thread 新建和销毁对象性能较差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争、占用过多资源导致死锁,并且缺乏定时执行、定期执行、线程中断等功能。

Java 提供了 4 种线程池,能够有效地管理、调度线程,避免过多的资源损耗。它的优点如下:

1. 重用存在的线程,减少对象创建、销毁的开销;

2. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞;

3. 提供定时执行、定期执行、单线程、并发数控制等功能。

线程池原理简单地解释就是:会创建多个线程并且进行管理,提交给线程的任务会被线程池指派给其中的线程进行执行,通过线程池的统一调度、管理使得多线程的使用更简单、高效。

线程池

线程池都实现了 ExecutorService 接口,该接口定义了线程池需要实现的接口,如 submit、execute、shutdown 等。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {

    void execute(Runnable command);
}
两种常用的线程池实现:

我们一般都不会通过 new 的形式来创建线程池,因为创建参数过程相对复杂,所以,JDK 提供了一个 Executors 工厂类来简化这个过程。下面分别介绍 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的使用。

1. 启动指定数量的线程 — ThreadPoolExecutor

ThreadPoolExecutor 是线程池的实现之一,它的功能是启动指定数量的线程以及将任务添加到一个队列中,并且将任务分发给空闲的线程。

ExecutorService 的生命周期包括 3 种状态:运行、关闭、终止。创建后便进入运行状态,当调用 shutdown() 方法时便进入关闭状态,此时 ExecutorService 不再接受新的任务,但它还在执行已经提交了的任务。当所有已经提交了的任务执行完后,就变成终止状态。

ThreadPoolExecutor 构造函数如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
参数名 作用
corePoolSize 线程池中所保存的核心线程数。
maximumPoolSize 线程池允许创建的最大线程数。
keepAliveTime 当前线程池线程总数大于核心线程数时,终止多余的空闲线程的时间。
unit keepAliveTime 参数的时间单位,可选值有毫秒、秒、分等。
workQueue 任务队列,如果当前线程池达到核心线程数 corePoolSize,且当前所有线程都处于活动状态时,则将新加入的任务放到此队列中。
threadFactory 线程工厂,让用户可以定制线程的创建过程,通常不需设置。
Handler 拒绝策略,当线程池与 workQueue 队列都满了的情况下,对新加任务采取的处理策略。

workQueue 有下列几个常用实现:

  1. ArrayBlockingQueue:基于数组结构的有界队列,此队列按 FIFO 原则对任务进行排序。如果队列满了还有任务进来,则调用拒绝策略。

  2. LinkedBlockingQueue:基于链表结构的无界队列,此队列按 FIFO 原则对任务进行排序。因为它是无界的,所以不会满,采用此队列后线程池将忽略拒绝策略(handler)参数,同时忽略最大线程数 maximumPoolSize 等参数。

  3. SynchronousQueue:直接将任务提交给线程而不是将它加入到队列,实际上此队列是空的。如果新任务来了线程池没有任何可用线程处理的话,则调用拒绝策略。其实要是把 maximumPoolSize 设置成无界(integer.MAX_VALUE),加上 SynchronousQueue 队列,就等同于 Executors.newCachedThreadPool()。

  4. PriorityBlockingQueue:具有优先级的队列的有界队列,可以自定义优先级,默认是按自然排序,可能很多场合并不适合。

当线程池与 workQueue 队列都满了的情况下,对新加任务采取的处理策略有如下四个默认实现:

  1. AbortPolicy:拒绝任务,抛出 RejectedExecutionException 异常。线程池的默认策略。

  2. CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。

  3. DiscardOldestPolicy:如果线程池尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。这样的结果是最后加入的任务反而有可能被执行,先前加入的都被抛弃了。

  4. DiscardPolicy:加不进的任务都被抛弃了,同时没有异常抛出。


1.1 newFixedThreadPool(int size)

对与 Android 平台来说,由于资源有限,最常使用的就是通过 Executors.newFixedThreadPool(int size) 函数来启动固定数量的线程池:

public class ExecutorDemo {

    // 任务数量
    private static final int MAX = 10;


    public static void fixedThreadPool(int size) throws ExecutionException, InterruptedException {
         // 创建固定数量的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(size);

        for (int i = 1; i <= MAX; i++) {
            // 提交任务
            Future<Integer> task = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("执行线程:" + Thread.currentThread().getName());
                    return fibc(40);
                }
            });
            System.out.println("第 " + i + " 次计算,结果:" + task.get());
        }
    }

    private static int fibc(int n) {
        if (n == 0) {
            return 0;
        }

        if (n == 1) {
            return 1;
        }

        return fibc(n - 1) + fibc(n - 2);
    }
}

newFixedThreadPool(int nThreads) 的实现:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

在该函数中,会调用 ThreadPoolExecutor 的构造函数,设置它的 corePoolSize 和 maximumPoolSize 值都是 nThreads,并且设置 keepAliveTime 参数为 0 毫秒,最后设置无界任务队列。这样该线程池就含有固定个数的线程,并且能容纳无限个任务。

输出结果如下:

执行线程:pool-3-thread-1
第 1 次计算,结果:102334155
执行线程:pool-3-thread-2
第 2 次计算,结果:102334155
执行线程:pool-3-thread-3
第 3 次计算,结果:102334155
执行线程:pool-3-thread-1
第 4 次计算,结果:102334155
执行线程:pool-3-thread-2
第 5 次计算,结果:102334155
执行线程:pool-3-thread-3
第 6 次计算,结果:102334155
执行线程:pool-3-thread-1
第 7 次计算,结果:102334155
执行线程:pool-3-thread-2
第 8 次计算,结果:102334155
执行线程:pool-3-thread-3
第 9 次计算,结果:102334155
执行线程:pool-3-thread-1
第 10 次计算,结果:102334155

1.2 newCachedThreadPool()

线程越多,并发量越大,然而占用的内存也就越大,指定过大的线程数量并不可取。因此,我们可能需要一种场景,如果来了一个新的任务,并且没有空闲线程可用,此时必须马上创建一个线程来立即执行任务。这时就可以通过 Executors 的 newCachedThreadPool 函数来实现。

    // 创建线程池
    public static void cachedThreadPool() throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 1; i <= MAX; i++) {
            // 提交任务
            Future<Integer> task = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("执行线程:" + Thread.currentThread().getName());
                    return fibc(40);
                }
            });
            System.out.println("第 " + i + " 次计算,结果:" + task.get());
        }
    }

newCachedThreadPool() 函数实现:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到,newCachedThreadPool 函数不需传入线程的数量。它会调用 ThreadPoolExecutor 的构造函数,设置它的
maximumPoolSize 值为无界值(Integer.MAX_VALUE),并且设置 keepAliveTime 参数为 60 秒,最后设置 SynchronousQueue
任务队列。这样就可以适应任意数量的并发任务。线程池为每个任务都创建了 1 个线程,当然这是在没有线程空闲的情况下才会创建新的线程。若一个线程中的任务已经做完了,那么这个线程可以为未被执行的任务提供执行。

输出结果如下:

执行线程:pool-3-thread-1
第 1 次计算,结果:102334155
执行线程:pool-3-thread-2
第 2 次计算,结果:102334155
执行线程:pool-3-thread-2
第 3 次计算,结果:102334155
执行线程:pool-3-thread-2
第 4 次计算,结果:102334155
执行线程:pool-3-thread-2
第 5 次计算,结果:102334155
执行线程:pool-3-thread-2
第 6 次计算,结果:102334155
执行线程:pool-3-thread-2
第 7 次计算,结果:102334155
执行线程:pool-3-thread-2
第 8 次计算,结果:102334155
执行线程:pool-3-thread-2
第 9 次计算,结果:102334155
执行线程:pool-3-thread-2
第 10 次计算,结果:102334155

2. 定时执行一些任务 — ScheduledThreadPoolExecutor

当我们需要定时执行一些任务,可以通过 ScheduledThreadPoolExecutor 来实现。通过 Executors 的 newScheduledThreadPool 函数可以很方便地创建定时执行任务的线程池。

下面是一个例子:

public class ScheduledThreadPoolDemo {

    public static void scheduledThreadPool() {
        
        // 创建定时执行的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        
        // 参数 2 为第一次执行任务延迟的时间,
        // 意思就是第一次调度开始时间点=当前时间 + initialDelay 
        // 参数 3 为执行周期
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread: " + Thread.currentThread().getName() + ",定时计算1:");
                System.out.println("结果:" + fibc(30));
            }
        }, 1, 2, TimeUnit.SECONDS);

        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread: " + Thread.currentThread().getName() + ",定时计算2:");
                System.out.println("结果:" + fibc(40));
            }
        }, 1, 2, TimeUnit.SECONDS);
    }


    private static int fibc(int n) {
        if (n == 0) {
            return 0;
        }

        if (n == 1) {
            return 1;
        }

        return fibc(n - 1) + fibc(n - 2);
    }
}

public class Main {

    public static void main(String[] args) {
        // write your code here
        ScheduledThreadPoolDemo.scheduledThreadPool();

}

该线程池有 3 个线程,我们指定了 2 个定时任务,因此,该线程池有两个线程来定时完成任务。

scheduleAtFixedRate() 函数的实现:

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

scheduleAtFixedRate() 函数就是设置定时任务的方法,参数 1 是要执行的任务,参数 2 是第一次运行任务时延迟时间(第一次调度开始时间点 = 当前时间 + initialDelay ),参数 3 是定时任务的周期(两次任务调度的间隔时间),参数 4 是时间单元,这里设置为秒。

部分输出结果:

Thread: pool-2-thread-1,定时计算1:
Thread: pool-2-thread-2,定时计算2:
结果:832040
结果:102334155

Thread: pool-2-thread-1,定时计算1:
Thread: pool-2-thread-3,定时计算2:
结果:832040
结果:102334155
上一篇 下一篇

猜你喜欢

热点阅读