JAVA高并发实战——线程复用:线程池

2019-08-05  本文已影响0人  XHHP

(一)、什么是线程池

(二)、不要重复发明轮子:JDK对线程池的支持

1、固定大小的线程池

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println("Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for(int i = 0;i < 10; i++) {
            es.submit(myTask);
        }
    }
}

2、计划任务

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
public class ScheduleThreadServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis()/1000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

(三)、刨根究底:核心线程池的内部实现

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(四)、拒绝策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS
                , new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString() + " is discard");       //重现拒绝策略
            }
        });

        for(int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

(五)、自定义线程创建:ThreadFactory

 Thread newThread(Runnable r);
public class ThreadFactoryDemo {
    public static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println("Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask myTask = new MyTask();
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
                new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("create" + t);
                return t;
            }
        });

        for(int i = 0; i < 5; i++) {
            executorService.submit(myTask);
        }
        Thread.sleep(2000);
    }
}

(六)、拓展线程池

public class ExtThreadPool {
    public static class MyTask implements Runnable {

        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行" + Thread.currentThread().getId() + " Task" + name);
            try {
                Thread.sleep(100);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>()) {

            @Override
            public void execute(Runnable command) {
                super.execute(command);
            }

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
        for (int i = 0; i < 5 ; i++) {
            MyTask myTask = new MyTask("Task"+i);
            executorService.execute(myTask);
            Thread.sleep(1000);
        }
        executorService.shutdown();
    }

}

(七)、分而治之:Fork/Join框架

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public class CountTask extends RecursiveTask<Long> {

    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }


    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {              //如果数目比较小,就直接运算
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            long step = (start + end) / 100;        //将任务划分成100个小任务
            ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
            long pos = start;               //初始化pos
            for (int i = 0; i < 100; i++) {
                long lastOne = pos + step;  //lastOne标志每次末端的值
                if (lastOne > end) {         //如果lastOne大于末端,就以end作为末端
                    lastOne = end;
                }
                CountTask subTask = new CountTask(pos, lastOne);    //创建子任务
                pos += step + 1;        //pos指向下一次的初始位置
                subTasks.add(subTask);  //添加子任务
                subTask.fork();         //执行子任务
            }
            for (CountTask t : subTasks) {
                sum += t.join();        //合并子任务
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();     //创建ForkJoin线程池
        CountTask task = new CountTask(0, 200000L);         //创建任务
        ForkJoinTask<Long> result = forkJoinPool.submit(task);  //将任务提交到线程池
        try {
            long res = result.get();                //主线程等待返回结果
            System.out.println("sum = " + res);      //输出结果
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

(八)、Guava中对线程池的拓展

  1. 特殊的DirectExecutor线程池
public static void main(String[] args) {
        Executor  executor = MoreExecutors.directExecutor();
        executor.execute(() ->{
            System.out.println("I'am runnint in" + Thread.currentThread().getName());
        });
    }
  1. Daemon线程池
public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        MoreExecutors.getExitExecutorService(executor);
        executor.execute(() ->{
            System.out.println(Thread.currentThread().getName());
        });
    }
上一篇 下一篇

猜你喜欢

热点阅读