Java并发编程-线程池

2018-09-12  本文已影响38人  agile4j

参考资料:《Java高并发程序设计》


1.线程池

1.线程池简介

2.JDK对线程池的支持

1.Executor框架类图

2.Executor框架提供的各种类型的线程池

1.newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()

2.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)

3.newCachedThreadPool

public static ExecutorService newCachedThreadPool()

4.newSingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor()

5.newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

3.线程池的使用

1.ExecutorService

public class Test {
    private static final Runnable myTask = () -> {
        System.out.println(System.currentTimeMillis() + ":Thread ID:" +
                Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            es.submit(myTask);
        }
    }
}

// 输出:
// 1536546327052:Thread ID:11
// 1536546327051:Thread ID:10
// 1536546327053:Thread ID:12
// 1536546327053:Thread ID:13
// 1536546327053:Thread ID:14
// 1536546328053:Thread ID:11
// 1536546328053:Thread ID:10
// 1536546328053:Thread ID:12
// 1536546328053:Thread ID:13
// 1536546328053:Thread ID:14

2.ScheduledExecutorService

// 在给定的时间对任务进行一次调度
public ScheduledFuture<?> schedule(Runnable command,
                                long delay, 
                                TimeUnit unit);
                                
// 周期性调度——以上一个任务开始时间为起点后延
// 若任务花费时间长于周期,则任务结束后立即开始下次调度    
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                            long initialDelay,
                                            long period,
                                            TimeUnit unit);

// 周期性调度——以上一个任务的结束时间为起点后延
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                long initialDelay,
                                                long delay,
                                                TimeUnit unit);
public class Test {
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
        ses.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(System.currentTimeMillis() / 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 0, 2, TimeUnit.SECONDS);
    }
}

// 部分输出:
// 1536547666
// 1536547668
// 1536547670
// 1536547672

3.线程池的内部实现

1.ThreadPoolExecutor

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
// newFixedThreadPool等使用的构造函数
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
// 最底层的构造函数
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize:线程池中的默认线程数量
  2. maximumPoolSize:线程池中的最大线程数量
  3. keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即超过corePoolSize的空闲线程,在多长时间内,会被销毁
  4. unit:keepAliveTime的单位
  5. workQueue:任务队列,被提交但尚未被执行的任务
  6. threadFactory:线程工厂,用于创建线程,一般用默认的即可
  7. handler:拒绝策略。当任务太多来不及处理,如何拒绝任务

2.任务队列

1.直接提交的队列:SynchronousQueue

2.有界的任务队列:ArrayBlockingQueue

public ArrayBlockingQueue(int capacity)

3.无界的任务队列:LinkedBlockingQueue

4.优先任务队列:PriorityBlockingQueue

使用自定义线程池时,要根据应用的具体情况,选择合适的并发队列作为任务的缓冲。当线程资源紧张时,不同的并发队列对系统行为和性能的影响均不同。

3.拒绝策略

1.AbortPolicy策略

2.CallerRunsPolicy策略

3.DiscardOledestPolicy策略

4.DiscardPolicy策略

5.自定义拒绝策略

public interface RejectedExecutionHandler {
    // r:请求执行的任务
    // executor:当前的线程池
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
public class Test {
    public static final Runnable myTask = () -> {
        System.out.println(System.currentTimeMillis() + ":Thread ID:" +
                Thread.currentThread().getId());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };


    public static void main(String[] args) throws Exception {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(10),
                Executors.defaultThreadFactory(),
                ((r, executor) -> System.out.println(r.toString() + " is discard")));
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(myTask);
            Thread.sleep(10);
        }
    }
}

// 部分输出:
// 1536576123894:Thread ID:11
// 1536576123915:Thread ID:13
// 1536576123919:Thread ID:14
// java.util.concurrent.FutureTask@17ed40e0 is discard
// java.util.concurrent.FutureTask@50675690 is discard
// java.util.concurrent.FutureTask@31b7dea0 is discard

4.自定义线程创建:ThreadFactory

Thread newThread(Runnable r);
  1. 可以跟踪线程池究竟在何时创建了多少线程
  2. 可以自定义线程的名称、组以及优先级等信息
  3. 可以任性地将所有线程设置为守护线程
  4. ...
public static void main(String[] args) throws Exception {
    ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
            new SynchronousQueue<>(),
            (r) -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("create " + t);
                return t;
            });
    for (int i = 0; i < Integer.MAX_VALUE; i++) {
        es.submit(myTask);
    }
    Thread.sleep(2000);
}

4.线程池的AOP

public class Test {
    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行" + System.currentTimeMillis() +
                    ":Thread ID:" + Thread.currentThread().getId() +
                    ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>()) {

            // 任务开始时执行
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask) r).name);
            }

            // 任务结束时执行
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完毕:" + ((MyTask) r).name);
            }

            // 整个线程池退出时执行
            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
        for (int i = 0; i < 5; i++) {
            es.execute(new MyTask("TASK-" + i));
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

// 输出:
// 准备执行:TASK-0
// 正在执行1536637265198:Thread ID:10,Task Name=TASK-0
// 准备执行:TASK-1
// 正在执行1536637265208:Thread ID:11,Task Name=TASK-1
// 准备执行:TASK-2
// 正在执行1536637265219:Thread ID:12,Task Name=TASK-2
// 准备执行:TASK-3
// 正在执行1536637265229:Thread ID:13,Task Name=TASK-3
// 准备执行:TASK-4
// 正在执行1536637265253:Thread ID:14,Task Name=TASK-4
// 执行完毕:TASK-0
// 执行完毕:TASK-1
// 执行完毕:TASK-2
// 执行完毕:TASK-3
// 执行完毕:TASK-4
// 线程池退出

5.线程池的线程数量

6.线程池中的异常堆栈

1.一个发生异常却没有任何错误提示的demo

public class Test {
    public static class DivTask implements Runnable {
        int a, b;

        public DivTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            System.out.println(a / b);
        }
    }

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                0L, TimeUnit.SECONDS, new SynchronousQueue<>());
        for (int i = 0; i < 5; i++) {
            pool.submit(new DivTask(100, i));
        }
    }
}

// 输出:
// 33
// 50
// 100
// 25

2.可以得到部分异常堆栈的方法:

1.放弃submit(),改用execute()

pool.execute(new DivTask(100, i));
100
50
33
25
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at com.daojia.khpt.util.base.Test$DivTask.run(Test.java:33)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2.改造对submit()的用法

Future future = pool.submit(new DivTask(100, i));
future.get();
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.daojia.khpt.util.base.Test.main(Test.java:42)
Caused by: java.lang.ArithmeticException: / by zero
    at com.daojia.khpt.util.base.Test$DivTask.run(Test.java:33)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3.得到完整异常堆栈的方法:扩展ThreadPoolExecutor线程池

public class Test {

    public static class TraceableThreadPoolExecutor extends ThreadPoolExecutor {
        public TraceableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                           long keepAliveTime, TimeUnit unit,
                                           BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        public void execute(Runnable command) {
            super.execute(wrapTask(command, getClientStack(), Thread.currentThread().getName()));
        }

        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(wrapTask(task, getClientStack(), Thread.currentThread().getName()));
        }

        private Exception getClientStack() {
            return new Exception("client stack trace");
        }

        private Runnable wrapTask(Runnable task, Exception clientStack, String clientThreadName) {
            return () -> {
                try {
                    task.run();
                } catch (Exception e) {
                    System.out.println("clientThreadName:" + clientThreadName);
                    clientStack.printStackTrace();
                    throw e;
                }
            };
        }
    }

    public static class DivTask implements Runnable {
        int a, b;

        public DivTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            System.out.println(a / b);
        }
    }

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor pool = new TraceableThreadPoolExecutor(0, Integer.MAX_VALUE,
                0L, TimeUnit.SECONDS, new SynchronousQueue<>());
        for (int i = 0; i < 5; i++) {
            pool.execute(new DivTask(100, i));
        }
    }
}

clientThreadName:main
java.lang.Exception: client stack trace
    at com.daojia.khpt.util.base.Test$TraceableThreadPoolExecutor.getClientStack(Test.java:43)
    at com.daojia.khpt.util.base.Test$TraceableThreadPoolExecutor.execute(Test.java:34)
100
    at com.daojia.khpt.util.base.Test.main(Test.java:77)
50
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
33
25
    at com.daojia.khpt.util.base.Test$DivTask.run(Test.java:69)
    at com.daojia.khpt.util.base.Test$TraceableThreadPoolExecutor.lambda$wrapTask$0(Test.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

end

上一篇下一篇

猜你喜欢

热点阅读