爱编程,爱生活

Java Consurrency 《Thread Pool》

2018-06-16  本文已影响10人  熬夜的猫头鹰

线程池优点

Java是支持多线程的,而线程在应用当中是稀缺资源,所以在编写程序的时候需要特别注意合理的利用线程。

Java的线程管理是在java.util.concurrent下,
接口Executor提供了执行已提交的 Runnable 任务的对象的方法。此接口解耦了任务提交和每个任务将如何运行的机制(包括线程使用的细节、调度等)。通常使用 Executor 而不是显式通过 new Thread(new(RunnableTask())).start()创建线程。

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

不过, Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务。如:

 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }
 

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程

  class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }
 

Executor已知的子接口以及实现类架构图:

[图片上传失败...(image-7ee135-1529156779808)]

下面这件介绍一下子接口、子类的用途:

ExecutorService

ExecutorService 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。

可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。

通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)

ExecutorService#shutdown
ExecutorService#shutdownNow
ExecutorService#isShutdown
ExecutorService#isTerminated
ExecutorService#awaitTermination
ExecutorService#submit(Callable<T>)
ExecutorService#submit(Runnable, T)
ExecutorService#submit(Runnable)
ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)

*** submit方法返回一个Future对象,对这对象可以设置超时时间 ***

public class Main {

    static ExecutorService executorService = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        executorService.execute(new OpenTV());
    }

    static class OpenTV implements Runnable{

        @Override
        public void run() {
            System.err.println("i have open the tv");
        }
    }

}

Doc上列举的实例:

class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public NetworkService(int port, int poolSize)
        throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }
 
    public void run() { // run the service
      try {
        for (;;) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // read and service request on socket
    }
 }
 

下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }
 

整体的测试类

public class ExecutorServiceMain {

    //创建一个线程池
    static ExecutorService pool = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//        pool.execute(new OpenTV());
        Future<String> openTvFuture = pool.submit(new OpenDoor());
        System.err.println(openTvFuture.get(6L,TimeUnit.SECONDS));
        shutDown(pool);
    }

    static class OpenTV implements Runnable{

        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.err.println("I have open the tv");
        }
    }

    static class OpenDoor implements Callable<String>{

        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "I love java";
        }
    }

    private static void shutDown(ExecutorService pool){
        pool.shutdown();
        try {
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                pool.shutdownNow();
            }
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                System.err.println("something is wrong");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            pool.shutdownNow();
            Thread.interrupted();

        }

    }


}

ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令

schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。

用 Executor.execute(Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通过所请求的 0 延迟进行安排。schedule 方法中允许出现 0 和负数延迟(但不是周期),并将这些视为一种立即执行的请求。

所有的 schedule 方法都接受相对 延迟和周期作为参数,而不是绝对的时间或日期。将以 Date 所表示的绝对时间转换成要求的形式很容易。例如,要安排在某个以后的 Date 运行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。但是要注意,由于网络时间同步协议、时钟漂移或其他因素的存在,因此相对延迟的期满日期不必与启用任务的当前 Date 相符。 Executors 类为此包中所提供的 ScheduledExecutorService 实现提供了便捷的工厂方法。

ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
ScheduledExecutorService#schedule(Callable<V>, long, TimeUnit)
ScheduledExecutorService#scheduleAtFixedRate
ScheduledExecutorService#scheduleWithFixedDelay

ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。


public class SchduledExecutorMain {

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledFuture<String> scheduled = scheduler.schedule(new RollTheBell(), 5, TimeUnit.SECONDS);
        System.err.println(scheduled.get());

        Job job = new Job();
        ScheduledFuture<?> jobScheduled = scheduler.scheduleAtFixedRate(job, 10, 10, TimeUnit.SECONDS);
        System.err.println(jobScheduled.get());
//        ExecutorServiceMain.shutDown(scheduler);
    }


    static class RollTheBell implements Callable<String> {

        @Override
        public String call() throws Exception {
            System.out.println("我是 5秒之后执行的");
            return "I love java";
        }
    }

    static class Job implements Runnable {

        public void run() {
            System.out.println("十秒之后每十秒执行一次");
        }

    }
}


AbstractExecutorService

提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。


AbstractExecutorService#newTaskFor(Runnable, T)
AbstractExecutorService#newTaskFor(Callable<T>)
AbstractExecutorService#submit(Runnable)
AbstractExecutorService#submit(Runnable, T)
AbstractExecutorService#submit(Callable<T>)
AbstractExecutorService#doInvokeAny
AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)


ThreadPoolExecutor

ThreadPoolExecutor继承了AbstractExecutorService,总共提供了四个构造函数


ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)

构造函数主要提供以下几个参数:

线程池提交任务


pool.execute(new Runnable() {
            @Override
            public void run() {
                // do something
            }
        });

我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
     Object s = future.get();
} catch (InterruptedException e) {
    // 处理中断异常
} catch (ExecutionException e) {
    // 处理无法执行任务异常
} finally {
    // 关闭线程池
    executor.shutdown();
}

线程池的关闭

public static void shutDown(ExecutorService pool){
        pool.shutdown();
        try {
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                pool.shutdownNow();
            }
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                System.err.println("something is wrong");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            pool.shutdownNow();
            Thread.interrupted();

        }

    }

线程池的工作原理分析

[图片上传失败...(image-4dd8a2-1529156779808)]

从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()* 方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }


参考:

上一篇下一篇

猜你喜欢

热点阅读