Java线程池学习

2018-04-12  本文已影响0人  留给时光吧

一般而言,对于并发多线程程序,需要频繁的创建线程,而许多线程很短时间内都可以执行完毕,此时会出现一个问题,就是线程的频繁创建于销毁所占用的时间和资源甚至超过了线程本身执行所需的资源。线程池的出现就是为了解决这个问题,解决的关键在于线程的复用。顾名思义,线程池就是存放线程的一个池子,需要用到线程时从池子中取线程,用完后归还,并不大量的创建与销毁。

我们先来学习最核心的类:ThreadPoolExecutor

先看他的构造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

每个构造都有很多参数,我们依次来认识这些参数的意义:

一些方法重要的方法:

void setCorePoolSize(int corePoolSize)
void setKeepAliveTime(long time, TimeUnit unit)
void setMaximumPoolSize(int maximumPoolSize)
void setRejectedExecutionHandler(RejectedExecutionHandler handler)
void setThreadFactory(ThreadFactory threadFactory)
int getActiveCount()
long getCompletedTaskCount()  //该线程池已经完成的任务数
int getCorePoolSize()
long getKeepAliveTime(TimeUnit unit)
int getLargestPoolSize()  //获取同时存在的最大线程数
int getMaximumPoolSize()
int getPoolSize() //当前线程池中线程数
BlockingQueue<Runnable> getQueue()
RejectedExecutionHandler getRejectedExecutionHandler()
long getTaskCount() //获取曾经计划完成的任务数
ThreadFactory getThreadFactory()
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

示例:

public class MyTask implements Runnable {
    public String tag;
    private SimpleDateFormat format;

    MyTask(String tag, SimpleDateFormat format){
        this.tag = tag;
        this.format = format;
    }

    private String getTime(){
        return format.format(new Date());
    }

    @Override
    public void run() {
        System.out.println("开始执行:" + tag +  " " +getTime());
        try {
            Thread.currentThread().sleep(2000);
        } catch (Exception e) {
            System.out.println(tag + ": " + e.getMessage() + " " + getTime());
        }
        System.out.println("执行完毕:" + tag + " " + getTime());
    }
}

public class ThreadPool {
    public static final int CORE_SIZE = 3;
    public static final int MAXI_SIZE = 6;
    public static final int QUEUE_SIZE = 3;
    public static final int ALIVE_TIME = 100;
    public static final int SIZE = 9;

    public static void main(String[] args) {
        SimpleDateFormat format = new SimpleDateFormat("dd-MM-ss");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_SIZE,MAXI_SIZE,ALIVE_TIME, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(QUEUE_SIZE),new ThreadPoolExecutor.DiscardPolicy(){
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                super.rejectedExecution(r, e);
                System.out.println("已抛弃线程:" + ((MyTask)r).tag + " " +format.format(new Date()));
            }
        });
        for (int i = 0;i<SIZE;i++){
            MyTask task = new MyTask("task" + i,format);
            executor.execute(task);
            System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                    executor.getQueue().size());
        }
        executor.shutdown();
    }
}

当SIZE为9时,打印log如下:

线程池中线程数目:1,队列中等待执行的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:1
线程池中线程数目:3,队列中等待执行的任务数目:2
线程池中线程数目:3,队列中等待执行的任务数目:3
线程池中线程数目:4,队列中等待执行的任务数目:3
线程池中线程数目:5,队列中等待执行的任务数目:3
线程池中线程数目:6,队列中等待执行的任务数目:3
开始执行:task1 12-04-45
开始执行:task6 12-04-45
开始执行:task8 12-04-45
开始执行:task0 12-04-45
开始执行:task2 12-04-45
开始执行:task7 12-04-45
执行完毕:task1 12-04-47
执行完毕:task6 12-04-47
开始执行:task3 12-04-47
执行完毕:task8 12-04-47
开始执行:task4 12-04-47
开始执行:task5 12-04-47
执行完毕:task0 12-04-47
执行完毕:task2 12-04-47
执行完毕:task7 12-04-47
执行完毕:task4 12-04-49
执行完毕:task3 12-04-49
执行完毕:task5 12-04-49

可以清楚的看到,任务先添加到线程池中,corePoolSize满之后,进入缓存队列,当缓存对列也满之后,开始创建新进程,直到达到maximumPoolSize。同时也注意到shutdown是可以等待任务完成的,可以用shutdownNow做对比。

当SIZE为10时,打印log如下:

线程池中线程数目:1,队列中等待执行的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:1
线程池中线程数目:3,队列中等待执行的任务数目:2
线程池中线程数目:3,队列中等待执行的任务数目:3
线程池中线程数目:4,队列中等待执行的任务数目:3
线程池中线程数目:5,队列中等待执行的任务数目:3
线程池中线程数目:6,队列中等待执行的任务数目:3
已抛弃线程:task9 12-04-44
线程池中线程数目:6,队列中等待执行的任务数目:3
开始执行:task1 12-04-44
开始执行:task6 12-04-44
开始执行:task8 12-04-44
开始执行:task0 12-04-44
开始执行:task2 12-04-44
开始执行:task7 12-04-44
执行完毕:task1 12-04-46
开始执行:task3 12-04-46
执行完毕:task6 12-04-46
开始执行:task4 12-04-46
执行完毕:task8 12-04-46
开始执行:task5 12-04-46
执行完毕:task0 12-04-46
执行完毕:task2 12-04-46
执行完毕:task7 12-04-46
执行完毕:task3 12-04-48
执行完毕:task5 12-04-48
执行完毕:task4 12-04-48

由于我们采用的DiscardPolicy策略,第10个线程最后被抛弃了。

可以看到ThreadPoolExecutor虽然做了很多简化,但用的时候依然需要配置很多东西,所以官方文档也建议在没有什么特殊要求的情况下,可以使用Executors类所提供的几个静态方法来快速建立和使用线程池

However, programmers are urged to use the more convenient Executors factory methods Executors.newCachedThreadPool() , Executors.newFixedThreadPool(int) and Executors.newSingleThreadExecutor() , that preconfigure settings for the most common usage scenarios.

接下来我们就来学习一下Executors:

这个类没有可用的构造方法,都是一些静态方法:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可见只是对ThreadPoolExecutor的封装,核心线程数为0,最大进程数为Integer.MAX_VALUE,也就是说新来的任务在没有空闲线程时,都创建新线程,空闲线程空闲60s后回收。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可见其核心进程和最大进程数是一样的,也就是能存在的最大线程数就是我们所指定的数目,且空闲进程存活时间为0,但也有复用机制,因为核心进程和最大进程数是一样的,由于使用的缓存队列为LinkedBlockingQueue,所以理论上有无限任务进行等待。

return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

FinalizableDelegatedExecutorService只是一个包装类,具体还是依赖ThreadPoolExecutor。可见基本与newFixedThreadPool一样,只不过核心线程数和最大线程数都指定为1.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

可见是基于ScheduledThreadPoolExecutor实现的,这个类是继承于ThreadPoolExecutor。构造方法实现如下:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

核心线程数由我们指定,最大线程数为Integer.MAX_VALUE,空闲线程存活时间为0。这个类主要用于实现线程的调度。通过源码我们发现。他首先重写了execute方法(submit方法也一样):

public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }

schedule主要是延迟执行一个任务。我们主要使用下面两个方法:

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

scheduleWithFixedDelay主要是指定两次任务的间隔,如下面代码就是在起始延迟1s后,每次任务执行完毕后延迟1s,再开始下一次任务:

SimpleDateFormat format = new SimpleDateFormat("dd-MM-ss-SSSS");
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
MyTask task = new MyTask("task1" ,format);
System.out.println("start schedule" + format.format(new Date()));
service.scheduleWithFixedDelay(task,1000,1000,TimeUnit.MILLISECONDS);

打印log如下:

start schedule12-04-09-0957
开始执行:task1 12-04-10-0982
执行完毕:task1 12-04-12-0983
开始执行:task1 12-04-13-0984
执行完毕:task1 12-04-15-0984
开始执行:task1 12-04-16-0985
执行完毕:task1 12-04-18-0986

scheduleAtFixedRate主要是指定两次任务开始的间隔时间,若间隔时间小于任务执行时间,则在上一次任务结束后立即开始下一次,示例如下:

SimpleDateFormat format = new SimpleDateFormat("dd-MM-ss-SSSS");
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
MyTask task = new MyTask("task1" ,format);
System.out.println("start schedule" + format.format(new Date()));
service.scheduleAtFixedRate(task,1000,1000,TimeUnit.MILLISECONDS);

打印log如下:

start schedule12-04-38-0516
开始执行:task1 12-04-39-0532
执行完毕:task1 12-04-41-0532
开始执行:task1 12-04-41-0532
执行完毕:task1 12-04-43-0572
开始执行:task1 12-04-43-0573
执行完毕:task1 12-04-45-0573
开始执行:task1 12-04-45-0573
执行完毕:task1 12-04-47-0580
开始执行:task1 12-04-47-0580
执行完毕:task1 12-04-49-0580
开始执行:task1 12-04-49-0580

注意两个方法都是无限循环的,实际运用要加上结束代码

上一篇下一篇

猜你喜欢

热点阅读