9、看!源码之netty线程池MultithreadEventE
2019-05-20 本文已影响0人
starskye
MultithreadEventExecutorGroup的源码阅读
MultithreadEventExecutorGroup继承AbstractEventExecutorGroup的子类,而此类做了对线程的大多的实现,从名字可以看出他是多线程事件执行组,而netty是事件驱动的所以在很多定义里都有这个event事件做了标注,以下是他源码。
//可以看出他也是一个抽象类说明它内部有一些抽象方法需要子类实现去定制一些特制的功能。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//执行器数组这个EventExecutor是group管理的执行器,在next方法可以看到他是返回的此执行器
//采用了final修饰并且采用了数组说明他的长度是固定的
//需要注意固定的修饰因为后面再使用的时候会有引用
private final EventExecutor[] children;
//此set是对上方的执行器数组的一个副本,并且这个副本只读。
private final Set<EventExecutor> readonlyChildren;
//中断执行器的数量,如果group被中断则会遍历调用children的中断方法,而每个children被中断都会进行一个计数
//而terminatedChildren则是对中断children的计数,为何使用后面再中断将会讲述
private final AtomicInteger terminatedChildren = new AtomicInteger();
//中断执行的返回结果,因为需要关闭时一个执行组所以为了异步执行所以返回了一个应答然后根据用于调用去决定是等待获取结果
//还是去设置一个结果事件,之前在讲述future的时候详细介绍过,等下讲述的时候将会详细介绍他的使用
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
//执行的选择器,什么是选择器呢,因为是线程组那么在来任务的时候将会选择使用哪个执行器去执行这个任务
//而此选择器则用到了,之前我们看到的定义next方法其实他的实现就是使用了这个选择器去返回执行器
//具体使用讲到的地方会详细说明
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
//线程池(线程执行组)的构造器
//nThreads 之前说过children是限制长度的而此参数就是用来设置此线程池的线程数大小
//threadFactory 线程的创建工厂,用于创建线程
//args 在创建执行器的时候传入固定参数,使用时将会讲述
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
//这里有个小逻辑如果传入的线程工厂不是null则把工厂包装给一个executor。如果默认传null则会用默认的线程工厂
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
//除了传入线程工厂还有一个做法就是传入一个executor,上一个构造就是对此构造的封装
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
//传入了默认的执行器的选择器
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
//最终操作的构造器,扩展了执行器的选择器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//如果传入的线程数小于0则抛出异常
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//如果传入的执行器是空的则采用默认的线程工厂和默认的执行器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//创建指定线程数的执行器数组
children = new EventExecutor[nThreads];
//遍历创建执行器
for (int i = 0; i < nThreads; i ++) {
//是否创建成功默认是false
boolean success = false;
try {
//使用了newChild方法创建执行器并且传入了executor和设置参数args
children[i] = newChild(executor, args);
//未报异常则设置true
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
//创建失败则抛出异常
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果success是false则代表创建失败则将已经创建好的执行器进行关闭
if (!success) {
//此处则是遍历创建了i的执行去调用他的shutdown方法
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
//虽然上面调用了中断方法但是他并不会立马终止,因为内部还有内容需要执行。
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
//判断当前的执行器是否终止了如果没有则等待获取结果
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
//抛出异常则中断当前线程
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//获取执行器的选择器
chooser = chooserFactory.newChooser(children);
//创建一个future的监听器用于监听终止结果
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
//当此执行组中的执行器被关闭的时候回调用此方法进入这里,这里进行终止数加一然后比较是否已经达到了执行器的总数
//如果没有则跳过,如果有则设置当前执行器的终止future为success为null
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//遍历创建好的执行器动态添加终止future的结果监听器,当监听器触发则会进入上方的内部类实现
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//创建一个children的镜像set
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
//拷贝这个set
Collections.addAll(childrenSet, children);
//并且设置此set内的所有数据不允许修改然后返回设置给readonlyChildren
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
//获取默认的线程工厂并且传入当前类名
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
//next则是使用了选择器的next方法
@Override
public EventExecutor next() {
return chooser.next();
}
//迭代执行器的时候调用的试只读的set
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
//获取当前执行器的数量
public final int executorCount() {
return children.length;
}
//声明了一个创建执行器的方法并且抽象的,因为每个执行器的实现都有特殊的操作所以此处抽象
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
//之前说过调用线程组的关闭其实就是遍历执行器集合的关闭方法因为之前加了监听器去处理返回结果所以此处返回的future用于监听是否执行结束了
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
//获取终止结果
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
//与关闭相同只不过此方法无返回值并且调用的方法不同是执行器的shutdown
@Override
@Deprecated
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}
//上方方法相同
//这里要注意只有所有的执行器都是关闭中状态才会是true
@Override
public boolean isShuttingDown() {
for (EventExecutor l: children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
//上方是关闭中这里是关闭
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
//是否终止
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
//等待时间范围是否执行终止完成
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
//计算死线时间
long deadline = System.nanoTime() + unit.toNanos(timeout);
//遍历执行器
loop: for (EventExecutor l: children) {
//死循环以便使用死线时间
for (;;) {
//如果当前的时间是大于死线时间则会小于等于0
long timeLeft = deadline - System.nanoTime();
//如果小于等于0则跳出loop就是最外层循环,不在循环
if (timeLeft <= 0) {
break loop;
}
//否则当前的线程等待计算的时间如果在时间内终止则跳出循环再次遍历下一个执行器然后计算时间再次重复操作
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
//如果到了时间则获取当前的终止结果
return isTerminated();
}
}
在文中提到了线程工厂和执行器选择工厂,下面将详细介绍他们。
//此类是用于包装了线程工厂的一个执行器
//这里需要对此类有印象后面再讲述执行器实现的时候回使用到他。
//这个类很简单他的类名很清楚的讲述了,每个任务的执行线程,代表后面的执行器的实现都不会有线程的操作都是有此类进行的,这里要有印象
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
//当调用此执行器时将会使用线程工厂创建一个线程去执行传入的runnable
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
//默认的线程工厂
public class DefaultThreadFactory implements ThreadFactory {
//线程组的id,这里组代表是工厂,因为工厂是可以new的如果不分配到时候很难看出是哪里个工厂创建的线程
//而此处采用了static代表此属性是跟类走的而不是对象所以每次创建一个工厂pool都会增加一
private static final AtomicInteger poolId = new AtomicInteger();
//创建线程的自增id
private final AtomicInteger nextId = new AtomicInteger();
//线程名前缀
private final String prefix;
//是否是守护线程
private final boolean daemon;
//当前线程的优先级
private final int priority;
//创建线程所属的线程组,可以为null系统会使用默认的线程组
protected final ThreadGroup threadGroup;
//下面是线程工厂的构造这里统一说明一下
//poolType 是Class 类型他最终会被转换成类名用于poolName的使用
//poolName 线程名但是不是完整的他会拼接一些其他数据比如poolId
//daemon 是否为守护线程除非手动设置否则默认都是false
//priority 线程的优先级 默认是NORM_PRIORITY 也是系统默认的
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(String poolName) {
this(poolName, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon) {
this(poolType, daemon, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(String poolName, boolean daemon) {
this(poolName, daemon, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, int priority) {
this(poolType, false, priority);
}
public DefaultThreadFactory(String poolName, int priority) {
this(poolName, false, priority);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
//此方法是将Class类型的poolType获取类名作用于线程名
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
//根据Class获取类名
String poolName = StringUtil.simpleClassName(poolType);
//这里判断他的长度如果是0个长度则返回unknown如果一个长度则将它最小化然后返回
//如果大于一个长度则判断第一个字符是不是大写第二个字符是不是小写日过是则吧第一个字符小写拼接上后面的字符返回
//否则直接返回获取到的类名
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
//上面遗漏了一个参数threadGroup 这个参数是线程组可以为null在这里也并没有任何使用的意义,都在创建线程后的线程设置
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
//优先级需要合法否则抛出异常
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
//这里就是拼接线程前缀的地方,刚才处理的名字加上工厂组的id
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
//下面就是一些属性的赋值没什么特殊意义
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
//将线程创建的group逻辑在这里操作了一遍,为了防止组为null之前说过可以传入null因为系统会自动设置而系统设置方式和此处一样
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
//创建线程
@Override
public Thread newThread(Runnable r) {
//这里可以看出它使用了一个静态方法做了包装runnable 然后使用前面工厂的前缀名拼接了线程的id号
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
//如果创建的线程与工厂不一致比如工厂设置的守护线程工厂daemon是true那么创建的线程是false将会进行设置
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
//优先级与上方一样
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
//实际上就是创建了一个FastThreadLocalThread线程的子类
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
//上方对runnable的包装,并没有的什么可讲的
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}
@Override
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
下面介绍之前的选择器
//工厂的定义本简单定义了一个创建选择器的方法newChooser 然后定义了内部接口EventExecutorChooser只有一个方法next
public interface EventExecutorChooserFactory {
EventExecutorChooser newChooser(EventExecutor[] executors);
interface EventExecutorChooser {
EventExecutor next();
}
}
//上面接口的默认实现
//此实现均为轮询从0 至 执行器数量-1
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
//饿汉单例
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
//实现的获取执行器方法内部就是将下方的两个选择器通过判断执行器的个数去选择使用哪个选择
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
//判断是否是二的幂次方
//是否是二的力量,如果是幂次方则会对运算提高效率,因为采用了&位运算而第二个算法采用了%运算这也是他们的区别
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
//如果是2的幂次方则采用这个算法此算法采用了位运算&符号进行计算的所以效率较高
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
//与上方的区别在于采用了%进行计算效率会有低所以设置数量时减一是2的幂次方如4、8、16、32等
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}