线程池2

2018-09-19  本文已影响0人  永远的太阳0123
  1. 线程池
    所有的线程池中都有一个任务队列,使用的是BlockingQueue<Runnable>作为任务的载体。
    active threads - 正在执行的任务
    queued tasks - 任务队列
    completed tasks - 结束的任务

  2. FixedThreadPool
    容量固定的线程池。创建时需要传入线程池的核心容量(也是最大容量)。
    使用场景:大多数情况下使用的线程池都是FixedThreadPool。OS系统和硬件有线程支持的上限,不能随意地、无限制地提供线程池。线程池默认的容量上限是Integer.MAX_VALUE。
    常见的线程池容量:PC - 200。服务器:1000~10000。

示例:

public class Test3_FixedThreadPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 6; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }
        // 打印线程池信息
        System.out.println(executorService);
        // 优雅关闭。不是强行关闭,而是不再处理新的任务,将已接收的任务处理完毕后关闭
        executorService.shutdown();
        // 判读是否已经结束, 相当于回收了资源。
        System.out.println(executorService.isTerminated());
        // 判断是否已经关闭, 即是否调用过shutdown方法
        System.out.println(executorService.isShutdown());
        // 打印线程池信息
        System.out.println(executorService);

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(executorService.isTerminated());
        System.out.println(executorService.isShutdown());
        System.out.println(executorService);
    }

}

结果:

java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-3 - test executor
pool-1-thread-1 - test executor
pool-1-thread-5 - test executor
pool-1-thread-2 - test executor
pool-1-thread-4 - test executor
pool-1-thread-3 - test executor
true
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
  1. CachedThreadPool
    没有容量限制的线程池。核心容量为0,最大容量为Integer.MAX_VALUE,生命周期为60秒。
    容量管理策略:如果线程池中的线程空闲达到60秒,线程自动释放。新任务优先使用空闲的线程,如果没有空闲的线程,则新建线程。例如,当执行第二个任务时第一个任务已经执行完毕,会复用执行第一个任务的线程,而不是新建线程。
    应用场景: 内部应用或测试应用。 内部应用,有条件的内部数据瞬间处理时应用,如:电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的信心)。 测试应用,在测试的时候,尝试得到硬件或软件的最高负载量,用于提供FixedThreadPool容量的指导。

示例一:

public class Test {

    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        cachedThreadPool.shutdown();
    }

}

结果:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1

示例二:

public class Test4_CachedThreadPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        System.out.println(executorService);

        for (int i = 0; i < 5; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }

        System.out.println(executorService);

        try {
            TimeUnit.SECONDS.sleep(65);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(executorService);
    }

}

结果:

java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-1 - test executor
pool-1-thread-2 - test executor
pool-1-thread-4 - test executor
pool-1-thread-3 - test executor
pool-1-thread-5 - test executor
java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 5]
  1. ScheduledThreadPool
    计划任务线程池,即可以自动执行计划任务的线程池。
    创建时需要传入线程池的核心容量,最大容量是Integer.MAX_VALUE,生命周期永久。
    scheduleAtFixedRate(runnable, start_limit, limit, timeunit)方法:
    (1)runnable - 要执行的任务。
    (2)start_limit - 程序开始执行后,时隔多久第一次执行任务。
    (3)limit - 多次执行任务的时间间隔。可以理解为隔了limit,下一个任务开始启动。
    (4)timeunit - 间隔时间的时间单位。
    使用场景: 执行计划任务时,ScheduledThreadPool和DelaydQueue有选择地使用。例如:电信行业中的数据整理,每分钟整理,每小时整理,每天整理等。

示例:

public class Test5_ScheduledThreadPool {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        System.out.println(scheduledExecutorService);
        // 定时完成任务。 scheduleAtFixedRate(runnable, start_limit, limit, timeunit)方法
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }, 0, 300, TimeUnit.MILLISECONDS);
    }

}

结果:

java.util.concurrent.ScheduledThreadPoolExecutor@70dea4e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
······
  1. SingleThreadExceutor
    核心容量和最大容量都为1的线程池,生命周期永久。
    使用场景:保证任务顺序时使用。例如: 游戏大厅中的公共频道聊天,秒杀。

示例:

public class Test6_SingleThreadExecutor {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        System.out.println(executorService);
        for (int i = 0; i < 5; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }
    }

}

结果:

java.util.concurrent.Executors$FinalizableDelegatedExecutorService@70dea4e
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
  1. ForkJoinPool
    ForkJoinPool - 分支合并线程池(mapduce类似的设计思想)。 可以使用递归完成复杂任务。要求可分支合并的任务必须是ForkJoinTask类型的子类型。ForkJoinTask类型提供了两个抽象子类型,RecursiveTask有返回结果的分支合并任务,RecursiveAction无返回结果的分支合并任务。
    compute方法:任务的执行逻辑。
    ForkJoinPool没有所谓的容量。默认都是1个线程。根据任务自动的分支新的子线程。当子线程任务结束后,自动合并。所说的自动分支和自动合并是根据fork和join两个方法实现的。
    使用场景: 主要是做科学计算或天文计算的。数据分析的。

示例:

public class Test7_ForkJoinPool {

    final static int[] numbers = new int[1000000];
    final static int MAX_SIZE = 50000;
    final static Random r = new Random();

    static {
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = r.nextInt(1000);
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
        // 普通方法
        long result = 0L;
        for (int i = 0; i < numbers.length; i++) {
            result += numbers[i];
        }
        System.out.println(result);
        // 使用ForkJoinPool
        ForkJoinPool pool = new ForkJoinPool();
        AddTask task = new AddTask(0, numbers.length);
        Future<Long> future = pool.submit(task);
        System.out.println(future.get());
    }

}

class AddTask extends RecursiveTask<Long> {
    int begin;
    int end;

    public AddTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }
    
    protected Long compute() {
        if ((end - begin) < Test7_ForkJoinPool.MAX_SIZE) {
            long sum = 0L;
            for (int i = begin; i < end; i++) {
                sum += Test7_ForkJoinPool.numbers[i];
            }
            // System.out.println("form " + begin + " to " + end + " sum is : " + sum);
            return sum;
        } else {
            int middle = begin + (end - begin) / 2;
            AddTask task1 = new AddTask(begin, middle);
            AddTask task2 = new AddTask(middle, end);
            task1.fork();// 分支新的子线程。
            task2.fork();
            return task1.join() + task2.join();// 合并。join是一个阻塞方法,一定会获取到任务的结果
        }
    }
}
  1. ThreadPoolExecutor
    除了ForkJoinPool,四种常用的线程池底层都是使用ThreadPoolExecutor实现的。
    其中一个构造方法:public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
    (1)int corePoolSize - 核心容量。线程池中至少保持多少线程,核心线程不会被回收。
    (2)int maximumPoolSize - 最大容量。线程池中最多含有多少线程。
    (3)long keepAliveTime - 生命周期,即当线程空闲多久之后自动回收。0为永久。
    (4)TimeUnit unit - 声明周期的单位。
    (5)BlockingQueue<Runnable> workQueue - 任务队列,阻塞队列,它的泛型必须是Runnable。
    使用场景: 默认提供的线程池不满足条件时使用。例如:初始线程数4,最大线程数200,线程空闲周期30秒。

示例:

public class Test8_ThreadPoolExecutor {

    public static void main(String[] args) {
        // 模拟fixedThreadPool。 核心线程5个,最大容量5个,线程的生命周期永久。
        ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());

        for (int i = 0; i < 6; i++) {
            service.execute(new Runnable() {
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }

        System.out.println(service);
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);

    }

}

结果:

java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-1 - test executor
pool-1-thread-4 - test executor
pool-1-thread-5 - test executor
pool-1-thread-2 - test executor
pool-1-thread-3 - test executor
pool-1-thread-1 - test executor
true
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
上一篇下一篇

猜你喜欢

热点阅读