Thrift源码分析(THsHaServer)

2020-02-22  本文已影响0人  番薯和米饭

HsHaServer模式(半同步半异步)

THsHaServer类是TNonblockingServer类的子类,在TNonblockingServer模式中,采用的是用一个线程来完成对所有socket的监听和业务处理,造成了效率的低下,比如某次rpc调用是访问数据库,读取大量数据,那么这个线程就会阻塞在这里,而不能去处理其他客户端的请求,THsHaServer模式的引入则是部分解决了这些问题。THsHaServer模式中,引入一个线程池来专门进行业务处理,如下图所示:


THsHaServer

从流程图就可以看出HsHaServer是将业务处理交给了专门的线程池来处,理,从而减小了监听线程的压力。

THsHaServer里的Args类源码

当你大致读懂了TNonblockingServer类和FrameBuffer类的源码后,THsHaServer的源码相对而言比较简单。下面我们先来看看THsHaServer类里的Args类:

public static class Args extends AbstractNonblockingServerArgs<Args> {
    // 这里是在设置工作线程池的默认参数,这些参数都在后面初始化线程池用得到
    public int minWorkerThreads = 5;
    public int maxWorkerThreads = Integer.MAX_VALUE;
    private int stopTimeoutVal = 60;
    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
    // 工作线程池
    private ExecutorService executorService = null;

    public Args(TNonblockingServerTransport transport) {
      super(transport);
    }
    /**
     * Sets the min and max threads.
     *
     * @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)}  instead.
     */
    @Deprecated
    public Args workerThreads(int n) {
      minWorkerThreads = n;
      maxWorkerThreads = n;
      return this;
    }

    /**
     * @return what the min threads was set to.
     * @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead.
     */
    @Deprecated
    public int getWorkerThreads() {
      return minWorkerThreads;
    }

    //设置线程池最少线程
    public Args minWorkerThreads(int n) {
      minWorkerThreads = n;
      return this;
    }

    //设置线程池最多线程
    public Args maxWorkerThreads(int n) {
      maxWorkerThreads = n;
      return this;
    }

    public int getMinWorkerThreads() {
      return minWorkerThreads;
    }

    public int getMaxWorkerThreads() {
      return maxWorkerThreads;
    }

    public int getStopTimeoutVal() {
      return stopTimeoutVal;
    }

    public Args stopTimeoutVal(int stopTimeoutVal) {
      this.stopTimeoutVal = stopTimeoutVal;
      return this;
    }

    public TimeUnit getStopTimeoutUnit() {
      return stopTimeoutUnit;
    }

    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
      this.stopTimeoutUnit = stopTimeoutUnit;
      return this;
    }

    public ExecutorService getExecutorService() {
      return executorService;
    }

    /**
     * THsHaServer 的工作线程可以自己先初始化一个工作线程池在传进来,也可以不初始化
     * 只设定相关参数后,交由THsHaServer自己实例化工作线程
     */

    /**
     * @param executorService
     * @return
     */

    // 这里是获取传进来的工作线程
    public Args executorService(ExecutorService executorService) {
      this.executorService = executorService;
      return this;
    }
  }

THsHaServer源码

THsHaServer类里面新增了一个ExecutorService类型的参数invoker,用来指向工作线程池的,好方便后面调用工作线程池。

  // 实例化THsHaServer的时候,会把工作线程池赋值给invoker,从而方便选择器(selector)方便的调用工作线程
  private final ExecutorService invoker;

  private final Args args;

  /**
   * Create the server with the specified Args configuration
   */
  public THsHaServer(Args args) {
    super(args);

    // 实例化THsHaServer的时候,如果args.executorService存在,也就是工作线程在外面已经实例化了
    // 然后从外面传进来了,则直接赋值给THsHaServer类的invoker参数,如果不是
    // 则在这里调用createInvokerPool()方法实例化工作线程池
    invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
    this.args = args;
  }

  /**
   * {@inheritDoc}
   */

  @Override
  protected void waitForShutdown() {
    // 阻塞主线程
    joinSelector();
    gracefullyShutdownInvokerPool();
  }

  /**
   * Helper to create an invoker pool
   * 实例化工作线程池
   */
  protected static ExecutorService createInvokerPool(Args options) {
    int minWorkerThreads = options.minWorkerThreads;
    int maxWorkerThreads = options.maxWorkerThreads;
    int stopTimeoutVal = options.stopTimeoutVal;
    TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
      maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

    return invoker;
  }
  }

关闭线程池操作

THsHaServer既然多了个工作线程池,那么当selector线程关闭时,还需要关闭工作线程池,gracefullyShutdownInvokerPool()方法主要是用来关闭线程池的,使用shutdown()的方法来关闭线程,不会立即终止线程池,首先将线程池的状态设置成STOP, 然后尝试停止所有的正在执行或暂停任务的线程。此时,则不能再往线程池中添加任何新的任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

 // 用来阻塞 调用THsHaServer的线程
  @Override
  protected void waitForShutdown() {
    joinSelector();
   //逐渐清空工作线程池
    gracefullyShutdownInvokerPool();
  }

  protected ExecutorService getInvoker() {
    return invoker;
  }

    // 用来关闭工作线程池
    protected void gracefullyShutdownInvokerPool() {
        // try to gracefully shut down the executor service
        // 使用shutdown()的方法来关闭线程池,如果线程正在处理任务,则让其处理完
        invoker.shutdown();

        // Loop until awaitTermination finally does return without a interrupted
        // exception. If we don't do this, then we'll shut down prematurely. We want
        // to let the executorService clear it's task queue, closing client sockets
        // appropriately.
        long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
        long now = System.currentTimeMillis();
        while (timeoutMS >= 0) {
            try {
                // waitTermination会一直等待,直到线程池状态为TERMINATED或者,等待的时间到达了指定的时间。
                invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
                break;
            } catch (InterruptedException ix) {
                // 等待的过程中出错了则重新计算等待的时间
                long newnow = System.currentTimeMillis();
                timeoutMS -= (newnow - now);
                now = newnow;
            }
        }
    }
                         

重头戏来了, 读到这里读者最好已经阅读了TNonblockingServer和FrameBuffer的源码,、THsHaServer通过重写了requestInvoke()方法,将业务处理交给工作线程池处理,这个方法和下个getRunnable()方法一起看, 获取用户rpc请求后,是通过调用frameBuffer.invoke()方法来调用相应的业务处理, THsHaServer里是将frameBuffer.invoke()方法封装在了Invocation类的run()方法里面, Invocation继承了Runnable类,等于将frameBuffer.invoke()方法封装在了一个Runnable类里面,让后给线程池去处理。

/**
   * We override the standard invoke method here to queue the invocation for
   * invoker service instead of immediately invoking. The thread pool takes care
   * of the rest.
   */
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    try {
      // 通过getRunnable()获取要执行的业务处理
      Runnable invocation = getRunnable(frameBuffer);
      // 将业务处理传给工作线程池去处理
      invoker.execute(invocation);
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false;
    }
  }

  protected Runnable getRunnable(FrameBuffer frameBuffer){
    // 实例化个Invocation
    return new Invocation(frameBuffer);
  }

Invocation 类源码

/**
 * An Invocation represents a method call that is prepared to execute, given
 * an idle worker thread. It contains the input and output protocols the
 * thread's processor should use to perform the usual Thrift invocation.
 */
class Invocation implements Runnable {
  private final FrameBuffer frameBuffer;

  public Invocation(final FrameBuffer frameBuffer) {
    this.frameBuffer = frameBuffer;
  }

  public void run() {
    frameBuffer.invoke();
  }
}

总结

上一篇下一篇

猜你喜欢

热点阅读