Java的五种线程池

2020-02-12  本文已影响0人  herohua

1.固定数量线程池newFixedThreadPool

创建一个可重用的固定数量的无界队列的线程池。在任何时候,最多有nThreads个活跃线程处理任务。如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到某个线程可用为止。如果在线程池关闭之前,执行任务的过程中由于执行失败导致任何线程终止,则在执行后续任务时将使用新线程代替。线程池中的线程将一直存活直到调用shutdown方法。

源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
使用:
private static void useFixedThreadPool() throws InterruptedException {
    final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

    // Returns the approximate number of threads that are actively
    System.out.println(executor.getActiveCount());

    IntStream.range(0, 100).boxed().forEach(i -> {
        executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 1============");
        });
    });
    
    System.out.println(executor.getActiveCount());

    TimeUnit.SECONDS.sleep(3);

    System.out.println(executor.getActiveCount());
}
运行结果:
运行结果.png

2.缓存线程池useCachedThreadPool

创建一个线程池,该线程池根据需要创建新线程,如果有可获得的缓存线程,将重用已经构造的线程。该线程池通常能提高执行许多短期异步任务的程序的性能。对execute的调用将重用先前已经构造的线程(如果存在)。如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。六十秒内未使用的线程将终止并从缓存中删除。因此,保持空闲时间足够长的池不会消耗任何资源。

源码:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
使用:
private static void useCachedThreadPool() throws InterruptedException {
    final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();

    System.out.println(executor.getActiveCount());

    executor.execute(() -> {
        executor.execute(() -> {
            System.out.println("============");
        });
    });

    System.out.println(executor.getActiveCount());

    IntStream.range(0, 100).boxed().forEach(i -> {
        executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("============");
        });
    });

    System.out.println(executor.getActiveCount());

    TimeUnit.SECONDS.sleep(3);

    System.out.println(executor.getActiveCount());
}
运行结果:
运行结果.png

3.只有一个线程的线程池useSingleThreadPool

创建一个只有单个线程的线程池。等价于new FinalizableDelegatedExecutorService(Executors.newFixedThreadPool(1));返回的不是ThreadPoolExecutor的实例,仅仅暴露ExecutorService的方法。

源码:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
使用:
private static void useSingleThreadPool() throws InterruptedException {
    final ExecutorService executor = Executors.newSingleThreadExecutor();

    IntStream.range(0, 100).boxed().forEach(i -> {
        executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "============");
        });
    });

    TimeUnit.SECONDS.sleep(3);
}
运行结果:
运行结果.png

4.延迟线程池ScheduledThreadPoolExecutor

源码:
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
使用1:
private static void testSchedule() throws InterruptedException, ExecutionException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);

    ScheduledFuture<?> future = executor.schedule(() -> {
        System.out.println("=======");
    }, 2, TimeUnit.SECONDS);


    //System.out.println(future.cancel(true));

    System.out.println(future.get());
}
使用2:
private static void TestScheduledAtFixedRate() {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    AtomicLong interval = new AtomicLong(0L);

    // 如果period小于任务执行的时长,则period失效,当上一个任务执行完毕之后,再执行下一个任务
    ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(() -> {
        long currentTime = System.currentTimeMillis();
        if (interval.get() == 0) {
            System.out.printf("The first time trigger task at %d\n", currentTime);
        } else {
            System.out.printf("The actually spend [%d]\n", currentTime - interval.get());
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        interval.set(currentTime);

    }, 3, 2, TimeUnit.SECONDS);
}
运行结果.png
使用3:
private static void testScheduleWithFixedDelay() throws InterruptedException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    System.out.println(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
    executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

    AtomicLong interval = new AtomicLong(0L);

    // 上一次任务结束之后的时间+delay=下一次任务开始的时间
    executor.scheduleWithFixedDelay(() -> {

        long currentTime = System.currentTimeMillis();
        if (interval.get() == 0) {
            System.out.printf("The first time trigger task at %d\n", currentTime);
        } else {
            System.out.printf("The actually spend [%d]\n", currentTime - interval.get());
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        interval.set(currentTime);

    }, 3, 2, TimeUnit.SECONDS);

    TimeUnit.SECONDS.sleep(15);

    executor.shutdown();
}
运行结果.png

5.并行线程池newWorkStealingPool

源码:
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
使用:
public static void main(String[] args) throws InterruptedException {

    Optional.of(Runtime.getRuntime().availableProcessors()).ifPresent(System.out::println);

    final ExecutorService executorService = Executors.newWorkStealingPool();

    final List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(i -> (Callable<String>) () -> {
        System.out.println(Thread.currentThread().getName());

        TimeUnit.SECONDS.sleep(3);

        return "Task" + i;
    }).collect(Collectors.toList());

    executorService.invokeAll(callableList).stream().map(future -> {
        try {
            return future.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }).forEach(System.out::println);
}
运行结果:
运行结果.png
上一篇下一篇

猜你喜欢

热点阅读