Thrift源码分析(THsHaServer)
HsHaServer模式(半同步半异步)
THsHaServer类是TNonblockingServer类的子类,在TNonblockingServer模式中,采用的是用一个线程来完成对所有socket的监听和业务处理,造成了效率的低下,比如某次rpc调用是访问数据库,读取大量数据,那么这个线程就会阻塞在这里,而不能去处理其他客户端的请求,THsHaServer模式的引入则是部分解决了这些问题。THsHaServer模式中,引入一个线程池来专门进行业务处理,如下图所示:
THsHaServer
从流程图就可以看出HsHaServer是将业务处理交给了专门的线程池来处,理,从而减小了监听线程的压力。
THsHaServer里的Args类源码
当你大致读懂了TNonblockingServer类和FrameBuffer类的源码后,THsHaServer的源码相对而言比较简单。下面我们先来看看THsHaServer类里的Args类:
- Args类里面就新加了实例化工作线程池时候所需要的参数,这些参数有默认值,也可以自己设置。
- 工作线程可以先实例化后在传进去,也可以不设置,实例化THsHaServer的时候会在构造函数里实例化一个工作线程池
public static class Args extends AbstractNonblockingServerArgs<Args> {
// 这里是在设置工作线程池的默认参数,这些参数都在后面初始化线程池用得到
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
private int stopTimeoutVal = 60;
private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
// 工作线程池
private ExecutorService executorService = null;
public Args(TNonblockingServerTransport transport) {
super(transport);
}
/**
* Sets the min and max threads.
*
* @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)} instead.
*/
@Deprecated
public Args workerThreads(int n) {
minWorkerThreads = n;
maxWorkerThreads = n;
return this;
}
/**
* @return what the min threads was set to.
* @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead.
*/
@Deprecated
public int getWorkerThreads() {
return minWorkerThreads;
}
//设置线程池最少线程
public Args minWorkerThreads(int n) {
minWorkerThreads = n;
return this;
}
//设置线程池最多线程
public Args maxWorkerThreads(int n) {
maxWorkerThreads = n;
return this;
}
public int getMinWorkerThreads() {
return minWorkerThreads;
}
public int getMaxWorkerThreads() {
return maxWorkerThreads;
}
public int getStopTimeoutVal() {
return stopTimeoutVal;
}
public Args stopTimeoutVal(int stopTimeoutVal) {
this.stopTimeoutVal = stopTimeoutVal;
return this;
}
public TimeUnit getStopTimeoutUnit() {
return stopTimeoutUnit;
}
public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
this.stopTimeoutUnit = stopTimeoutUnit;
return this;
}
public ExecutorService getExecutorService() {
return executorService;
}
/**
* THsHaServer 的工作线程可以自己先初始化一个工作线程池在传进来,也可以不初始化
* 只设定相关参数后,交由THsHaServer自己实例化工作线程
*/
/**
* @param executorService
* @return
*/
// 这里是获取传进来的工作线程
public Args executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
}
THsHaServer源码
THsHaServer类里面新增了一个ExecutorService类型的参数invoker,用来指向工作线程池的,好方便后面调用工作线程池。
// 实例化THsHaServer的时候,会把工作线程池赋值给invoker,从而方便选择器(selector)方便的调用工作线程
private final ExecutorService invoker;
private final Args args;
/**
* Create the server with the specified Args configuration
*/
public THsHaServer(Args args) {
super(args);
// 实例化THsHaServer的时候,如果args.executorService存在,也就是工作线程在外面已经实例化了
// 然后从外面传进来了,则直接赋值给THsHaServer类的invoker参数,如果不是
// 则在这里调用createInvokerPool()方法实例化工作线程池
invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
this.args = args;
}
/**
* {@inheritDoc}
*/
@Override
protected void waitForShutdown() {
// 阻塞主线程
joinSelector();
gracefullyShutdownInvokerPool();
}
/**
* Helper to create an invoker pool
* 实例化工作线程池
*/
protected static ExecutorService createInvokerPool(Args options) {
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
}
关闭线程池操作
THsHaServer既然多了个工作线程池,那么当selector线程关闭时,还需要关闭工作线程池,gracefullyShutdownInvokerPool()方法主要是用来关闭线程池的,使用shutdown()的方法来关闭线程,不会立即终止线程池,首先将线程池的状态设置成STOP, 然后尝试停止所有的正在执行或暂停任务的线程。此时,则不能再往线程池中添加任何新的任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
// 用来阻塞 调用THsHaServer的线程
@Override
protected void waitForShutdown() {
joinSelector();
//逐渐清空工作线程池
gracefullyShutdownInvokerPool();
}
protected ExecutorService getInvoker() {
return invoker;
}
// 用来关闭工作线程池
protected void gracefullyShutdownInvokerPool() {
// try to gracefully shut down the executor service
// 使用shutdown()的方法来关闭线程池,如果线程正在处理任务,则让其处理完
invoker.shutdown();
// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
// waitTermination会一直等待,直到线程池状态为TERMINATED或者,等待的时间到达了指定的时间。
invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
// 等待的过程中出错了则重新计算等待的时间
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
重头戏来了, 读到这里读者最好已经阅读了TNonblockingServer和FrameBuffer的源码,、THsHaServer通过重写了requestInvoke()方法,将业务处理交给工作线程池处理,这个方法和下个getRunnable()方法一起看, 获取用户rpc请求后,是通过调用frameBuffer.invoke()方法来调用相应的业务处理, THsHaServer里是将frameBuffer.invoke()方法封装在了Invocation类的run()方法里面, Invocation继承了Runnable类,等于将frameBuffer.invoke()方法封装在了一个Runnable类里面,让后给线程池去处理。
/**
* We override the standard invoke method here to queue the invocation for
* invoker service instead of immediately invoking. The thread pool takes care
* of the rest.
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
try {
// 通过getRunnable()获取要执行的业务处理
Runnable invocation = getRunnable(frameBuffer);
// 将业务处理传给工作线程池去处理
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
}
protected Runnable getRunnable(FrameBuffer frameBuffer){
// 实例化个Invocation
return new Invocation(frameBuffer);
}
Invocation 类源码
/**
* An Invocation represents a method call that is prepared to execute, given
* an idle worker thread. It contains the input and output protocols the
* thread's processor should use to perform the usual Thrift invocation.
*/
class Invocation implements Runnable {
private final FrameBuffer frameBuffer;
public Invocation(final FrameBuffer frameBuffer) {
this.frameBuffer = frameBuffer;
}
public void run() {
frameBuffer.invoke();
}
}
总结
-
THsHaServer的优点
THsHaServer与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。 -
THsHaServer的缺点
主线程仍然需要完成所有socket的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受,瓶颈在监听socket的主线程上,因为主线程就只有一个。