1、看!源码之netty线程池结构
2019-01-27 本文已影响0人
starskye
Netty的线程结构图
线程池结构.jpg可以从上图看出netty的线程实现由很多种但是他们都是来自于jdk自带的Executor和ExecutorService。
这里稍微提一下Executor他很简单只有一个execut方法而这个方法只有一个参数是Runnable,对于开发者这个类型很熟悉所有线程的执行都需要有他,并且该接口也只有一个方法就是run()。
上文大概说了下Executor的结构,这里说下他的定义,他是一个执行器的定义,因为从图中可以看出他是一个接口。
ExecutorService是一个服务可以当做管理工具,他管理了执行器的创建和停止,既然是执行器服务那么就代表着他是可以管理多个执行器的否则也就没有意义,但是也是一个接口对于他的定义如下。
//停止执行器,虽然停止但是会将在停止前已经存在的任务将会尝试停止如果失败则继续执行完成为止。
void shutdown();
//停止执行器,相比上方的停止这个就暴力一些,立马停止并且取消停止前已经存在的任务并且尝试关闭正在执行的任务。
List<Runnable> shutdownNow();
//执行器服务是否停止服务(由子类实现决定)
boolean isShutdown();
//该方法是对shutdown或shutdownNow的状态获取是否终止完成。如果没有调用这两个方法那么此方法永远是false(具体情况有子类决定)
boolean isTerminated();
//等待终止服务,此方法不会有任何的操作仅仅是等待任务的执行,会有两个结果1、要么执行结束2、要么执行时间超过操作时间
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//提交一个可以带返回值的任务(下方会详细介绍Future这个类)
//Callable只有一个方法 V call(),Future会默认将他的返回存到自己的成员变量中返回,调用get方法获取
<T> Future<T> submit(Callable<T> task);
//提交一个Runnable任务会默认的将传入的result存入到一个Future中在进行返回
<T> Future<T> submit(Runnable task, T result);
//提交一个Runnable任务会默认将null存入Future的成员变量中通过get获得
Future<?> submit(Runnable task);
//传入一个Callable类型集合并且等到集合处理完成一起返回,最终返回一个对应结果的Future集合。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
//相比上方此方法设置了超时时间,等待任务的超时,如果超时则进行任务的取消处理,继续会放回对应的结果列表但是如果是被取消的结果则会抛出取消异常,这代表着如果在不缺点是否成功的时候需要调用isCancelled方法此方法校验当前的Future是否取消如果取消了则不要调用get因为会出异常。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
//传入指定的任务集合只要其中任何一个任务完成则取消其他任务并且返回完成的任务的返回值,这里并不是Future而是Callable的返回类型
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
//传入指定的任务集合只要启动任何一个任务完成则取消其他的任务执行并且返回第一执行完成的任务结果,如果连第一个任务执行都超时了那么不会返回结果会直接抛出TimeOut的超时异常。
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
ExecutorService的定义到此结束了。接下来讲述他依赖的类型Future和Callable。
//这个接口非常简单定义一个call方法此方法只有两个结果要么抛出业务异常,要么返回计算的结果,而结果类型则是创建时设置的泛型V。
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
//尝试取消任务执行,如果取消失败则可能是任务已完成或者已经取消或其他原因导致失败,如果成功分为两种情况1、任务未运行则永远不会执行2、任务已经执行mayInterruptIfRunning需要判断他是否为true如果是则尝试中断执行。
boolean cancel(boolean mayInterruptIfRunning);
//如果是主动取消的则返回true,否则任何形式的结束此方法都是false
boolean isCancelled();
//在任何情况下都会返回true,除非是新new的调用此方法是false具体看子类状态的实现。
boolean isDone();
//等待计算完成获取结果,有等待就代表是阻塞的。如果执行异常则会抛出
V get() throws InterruptedException, ExecutionException;
//等待指定的时间获取结果如果超时则抛出TimeoutException,如果执行异常则会抛出
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException,TimeoutException;
}
这里介绍完了ExecutorService的结构和他依赖的结构,因为都是定义所以就只用知道他们是干嘛的使用者又是怎么组合这些定义最终完成什么功能的,所以接下来还是定义。。。,不过是关于ScheduledExecutorService的定义,细心的读者会发现笔者的讲解流程是根据线程池的结构图一层一层讲解的。
//可以看出他继承与ExecutorService,并且对ExecutorService的结构进行了扩展
public interface ScheduledExecutorService extends ExecutorService {
//创建一个延迟执行的任务,此任务延迟时间有delay与unit控制,返回值Future的get方法返回null,因为run方法并没有返回值所以返回默认值
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//是上方方法的重构,将Runnable改为Callable,因为传入的试Callable所以在Future的get方法返回call方法的返回值,看不明白的读者可以根据上方的Callable的讲解进行理解。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//之前创建的延迟启动的任务都只执行一次,此方法是根据initialDelay为运行起点等待设置的启动时长,然后以period为周期的循环执行任务。除非取消任务或者中断调用者线程否则次任务永久运行,因为是永久运行所以无返回值,默认都为NULL。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//scheduleAtFixedRate的启动一样都是指定initialDelay的时长后运行,也是在指定的周期中重复运行,除非遇到异常或线程中断和取消执行否则将永久运行。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
//那么scheduleAtFixedRate和scheduleWithFixedDelay他们的区别是什么?
//scheduleWithFixedDelay:在计算周期的时候会将任务的完成时间算进去,意思就是如果任务执行了1s而周期是2s那么下次执行的时间会是启动时间+3s。
//对应公式:下一次执行开始时间=上一次执行结束时间+周期+上一次执行的时长
//scheduleAtFixedRate:恰恰相反,计算周期的时候不会管你的执行时间会继续按照设置的周期计算,比如任务执行了1s而周期是2s那么就会根据你的第一次启动时间+2s。
//对应公式:下一次执行开始时间=上次执行结束时间+周期
//这里会有一个疑问,那就是如果我周期设置为2s而任务执行了3s,scheduleAtFixedRate方法不是不计操作时长的吗?那么会不会出现第一次执行还未结束第二次已经开始呢?这个问题是不存在的,因为在前面讲解的时候就说了结束周期任务中有一条就是当线程中断就会结束周期,意思就是一个线程维护一个周期任务,一旦线程中断那么任务也就结束了,如果多个线程对应一个任务的时候只有一个线程在运行任务其他线程都在等待状态。
//这里这样说可能读者会纳闷这个结论怎么来的,等介绍完结构后会讲解他的实现,到时候各位再结合今天的讲解看那就一目了然了。
}
//在ScheduledExecutorService中依赖了ScheduledFuture,而从下方可以看出ScheduledFuture是一个空的接口他继承与Delayed和Future
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
//Delayed接口只定义了getDelay方法,此方法用于获取剩余的时长:getDelay=下次执行开始时间-当前时间。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
//可以看出Delayed类继承了Comparable,此接口是用来比较两个对象的具体看子类实现,这里说下他的返回值,当前的数比较结果:-1小于、0等于、1大于。
public interface Comparable<T> {
public int compareTo(T o);
}
顶层的基本说完了,剩下零零散散的会在使用的地方再提及,接下来就是对netty定义的讲解。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
//是否正在关闭,只有调用shutdownGracefully兄弟方法或者其他方法,这个是根据实现走的所以再说实现的时候在细讲。
boolean isShuttingDown();
//优美关闭线程池,怎么优美了看他实现就是了。
Future<?> shutdownGracefully();
//上方的重载,具体实现按照代码讲解,这里总结为之过早。
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
//返回终止时执行的Future操作结果。实现时细讲。
Future<?> terminationFuture();
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
@Deprecated
void shutdown();
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
@Deprecated
List<Runnable> shutdownNow();
//返回一个由EventExecutorGroup管理的EventExecutor
EventExecutor next();
//此方法重写与Iterable接口下方单独介绍
@Override
Iterator<EventExecutor> iterator();
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
Future<?> submit(Runnable task);
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
<T> Future<T> submit(Runnable task, T result);
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
<T> Future<T> submit(Callable<T> task);
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//此方法来自于ScheduledExecutorService具体讲解查看上文
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
//此接口用来标记当前类可进行迭代并且支持for语句进行遍历
public interface Iterable<T> {
//获取当前类的迭代器,迭代器不会进行深入讲解,因为这个涉及到数据的存储方式内存等。如果讲又是一大堆,这里读者只用知道Iterator是获取元素的,常用的两个方法hasNext()是否还有下一个数据和next()返回当前数据并且做好获取下一个的准备,这里光说可能不理解,举个例子数组[1,2],调用next则从下标0获取并且下标+1,再次next则获取下标为1的那么自然返回2,再次next会发现没有了那么迭代器报错,所以就需要使用hasNext方法判断是否有下一个,而他的判断方法根据现实数据结构走,如果按上面的例子那么就是判断当前的下标是否小于数组的length。
Iterator<T> iterator();
//java8的特性接口的default方法和接受一个函数进行调用。
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
//分割迭代器,支持java8的stream操作,这里暂不进行介绍。
default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}
//此事件执行器继承与EventExecutorGroup事件执行组,这里暂不讨论他的数据结构后面实现的时候自然会出现他是怎么个结构。
//在EventExecutorGroup接口中可以看到有对EventExecutor的依赖,EventExecutorGroup的迭代就是EventExecutor。
public interface EventExecutor extends EventExecutorGroup {
//如果是EventExecutor的实现则此方法代表返回当前类this。
@Override
EventExecutor next();
//返回EventExecutor的管理者,这里可能有些迷糊在后面会在实现中细讲他们的关系。
EventExecutorGroup parent();
//判断当前线程是否是执行器的执行线程,一般是调用下面的方法传入当前的线程
boolean inEventLoop();
//传入一个线程判断是否是当前执行器的执行线程
boolean inEventLoop(Thread thread);
//返回一个新的应答,这里的Promise是Future的实现,下面会介绍
<V> Promise<V> newPromise();
//返回一个新的应答
<V> ProgressivePromise<V> newProgressivePromise();
//返回一个SucceededFuture对象并且他的isSuccess方法为true,使用get获取结果是不会阻塞会返回你传入的result。
//这里的Future并不是前面介绍的这个Future是netty自己定义的,下面会详细介绍
<V> Future<V> newSucceededFuture(V result);
//传入一个抛出的异常描述,返回FailedFuture类的一个实例,此实例与上方恰好相反,isSuccess为false,并且get的时候抛出异常。
<V> Future<V> newFailedFuture(Throwable cause);
}
//netty定义的Future可以看出他继承与jdk的Future
public interface Future<V> extends java.util.concurrent.Future<V> {
//当操作完成时返回true
boolean isSuccess();
//操作取消的时候返回true
boolean isCancellable();
//获取执行异常,执行成功则返回null
Throwable cause();
//添加执行监听器,当执行操作完成时会调用此监听器也就是isDone为true的时候调用
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
//添加一个集合的监听器
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//删除监听器,只会删除第一次出现的,如果一个监听器添加了多次那么只会删除第一次出现的
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
//与上方相同只是支持传入多个监听器进行删除
Future<V> mremoveListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//同步等待结果,要么返回结果要么抛出异常,具体按照实现类讲解。
Future<V> sync() throws InterruptedException;
//此方法不会抛出中断异常,因为内部做了处理,这是笔者看实现总结的,当然不止一种实现所以暂时理解即可后续会详细介绍
Future<V> syncUninterruptibly();
//含义与sync方法相同
Future<V> await() throws InterruptedException;
//含义与syncUninterruptibly方法相同
Future<V> awaitUninterruptibly();
//指定任务执行的等待时长,等待结果结束后根据执行结果isDone方法结果返回,具体以实现为主
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
//调用上方的方法,是指传入的timeoutMillis是毫秒
boolean await(long timeoutMillis) throws InterruptedException;
//上面的await方法如果出现中断会抛出中断异常,此方法不会抛出异常,传参和第一个await一样
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
//不会抛出中断异常传参和第二个await一样
boolean awaitUninterruptibly(long timeoutMillis);
//非阻塞获取结果如果futrue执行成功那么返回call的返回值,如果失败或者还正在处理则返回null
V getNow();
//暂时与父类的含义一样,具体根据实现子类理解。
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
结构到这里差不多结束了剩下的结构需要根据具体的实现查看所以这里暂不介绍,下一篇文章将会讲解netty中的future,从本文可以看出有很多关于future的使用,可以看出netty非常依赖于他那么下节将会进行future的讲解。