bugstac...

Netty源码分析-03 Netty线程池

2019-03-26  本文已影响0人  史圣杰

线程池是一个在多线程场景中运用很广泛的并发框架,需要异步执行或并发执行任务的程序都可以使用线程池。有任务到来时,如果不使用线程池,我们需要不断的创建/销毁线程,还需要对线程进行管理;而使用线程池,直接将任务提交到线程池即可。使用线程池有几个好处:无需重复创建/销毁线程,降低资源消耗;提高程序响应速度;提高线程的可管理性。

3.1 实现原理

线程池内部一般包含一个核心线程池,其内部的线程在创建之后一般不会销毁,执行完任务后线程会阻塞等待新任务到来。
当向线程池提交任务时,线程池会做如下判断:

线程池执行流程

了解了实现原理,我们先来自己实现一个线程池,首先定义线程池的接口

ThreadPool
线程池的接口里面最重要的方法是execute执行任务

public interface ThreadPool<Job extends Runnable> {
    //提交一个Job,这个Job需要实现Runnable接口
    void execute(Job job);
    //关闭线程池
    void shutdown();
    //增加工作者线程
    void addWorkers(int num);
    //减少工作者线程
    void removeWorker(int num);
    //得到正在等待执行的任务数量
    int getJobSize();
}

CommonThreadPool
在实现线程池时,我们需要定义线程池的大小,以及保存任务的列表jobs,下面是变量定义:

    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 100;
    // 线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 1;
    // 线程池最小数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 工作列表
    private final LinkedList<Job> jobs = new LinkedList<Job>();

在线程池初始化时,我们要将核心线程池进行初始化,创建多个Worker线程,然后启动Worker线程。

// num 为DEFAULT_WORKER_NUMBERS 默认线程池大小
private void initializeWokers(int num) {
        // 创建多个线程,加入workers中,并启动
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-"
                    + threadNum.getAndIncrement());
            thread.start();
        }
    }

Worker启动后,一直没有任务,需要阻塞在jobs上(jobs是上面定义的任务列表),Worker等待任务到来后唤醒获取队列中的任务并执行。下面的代码中,如果jobs为空,则线程等待;

// worker的代码,首先要获取jobs的锁,
synchronized (jobs) {
                    while (jobs.isEmpty()) {// 如果jobs是空的,则执行jobs.wait,使用while而不是if,因为wait后可能已经为空了,需要继续等待
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Thread.currentThread().interrupt();// 中断
                            return;// 结束
                        }
                    }
                    job = jobs.removeFirst();// 第一个job
                    if (job != null) {
                        try {
                            job.run();//注意,这里是run而不是start,传入的Job
                        } catch (Exception e) {
                            // 忽略Job执行中的Exception
                            e.printStackTrace();
                        }
                    }
                }

提交任务时,只需要将任务加入jobs中,然后通知worker线程即可。worker线程获得锁后会取第一个任务执行。执行完毕,若jobs为空,worker线程继续进行休眠等待任务到来。

@Override
    public void execute(Job job) {
        if (job == null)
            return;
        synchronized (jobs) {
            jobs.addLast(job);
            jobs.notify();
        }
    }

完整的代码可以查看https://github.com/ssj234/JavaStudy_IO/tree/master/IOResearch/src/net/ssj/pool

3.2 Java的Executor框架

Java平台本身提供了Executor框架用来帮助我们使用线程池。

Executor框架

Executor框架最核心的类是ThreadPoolExecutor,这是各个线程池的实现类,有如下几个属性:

通过Executor框架的根据类Executors,可以创建三种基本的线程池:

FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池。

// 获取fixedThreadPool
ExecutorService fixedThreadPool=Executors.newFixedThreadPool(paramInt);

//内部会调用下面的方法,参数 corePoolSize、maximumPoolSize、keepAliveTime、workQueue
return new ThreadPoolExecutor(paramInt, paramInt, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

FixedTheadPool设置的线程池大小和最大数量一样;keepAliveTime为0,代表多余的空闲线程会立刻终止;保存任务的队列使用LinkedBlockingQueue,当线程池中的线程执行完任务后,会循环反复从队列中获取任务来执行。
FixedThreadPool适用于限制当前线程数量的应用场景,适用于负载比较重的服务器。

SingleThreadExecutor

SingleThreadExecutor的核心线程池数量corePoolSize和最大数量maximumPoolSize都设置为1,适用于需要保证顺序执行的场景

ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();

     return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池,适用于短期异步的小任务,或负载教轻的服务器。

ExecutorService cachedThreadPool=Executors.newCachedThreadPool();

     return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());

SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。corePoolSize是0,maximumPoolSize都最大,无界的。keepAliveTime为60秒,空闲线程超过60S会被终止。

ScheduleThreadPoolExecutor

ScheduleThreadPoolExecutor和Timer类似,可以设置延时执行或周期执行,但比Timer有更多的功能。Timer和TimerTask只创建一个线程,任务执行时间超过周期会产生一些问题。Timer创建的线程没有处理异常,因此一旦抛出非受检异常,会立刻终止。

ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(5);
//可以直接执行
executor.execute(new JobTaskR("executor", 0));
executor.execute(new JobTaskR("executor", 1));

System.out.println("5S后执行executor3");
//隔5秒后执行一次,但只会执行一次。
executor.schedule(new JobTaskR("executor", 3), 5, TimeUnit.SECONDS);

System.out.println("开始周期调度");
//设置周期执行,初始时6S后执行,之后每2s执行一次
executor.scheduleAtFixedRate(new JobTaskR("executor", 4), 6, 2, TimeUnit.SECONDS);

scheduleAtFixedRate或者scheduleWithFixedDelay方法,它们不同的是前者以固定频率执行,后者以相对固定延迟之后执行。

3.3 Netty的EventLoop与线程池

Netty的事件循环和事件循环组的实现中,类的层级关系比较复杂,其底层是Java线程池的实现,不过在netty的实际使用中还是比较简单的,我们只需要使用如下的代码即可,

EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)//设置事件循环组

Netty的事件循环机制有两个基本接口:EventLoop和EventLoopGroup。前者是事件循环,后者是由多个事件循环组成的组。
EventLoop自身是一个不断循环执行的线程,以NioEventLoop为例,其继承了SingleThreadEventExecutor,内部的executor是创建NioEventLoop时传入的线程池,用来将run方法放入线程池中执行;此外还包含为一个TaskQueue,netty在处理io过程中的task可以提交到这个队列中,事件循环会不断获取task并执行,因此但其本身也可以看做一个线程池。
NioEventLoop的run方法中,Nio的事件循环会不断select后获取任务并执行,然后根据ioRatio的设置执行TaskQueue的任务。NioEventLoop的execute方法中,其会将task加入到taskQueue等待事件循环执行。因此,我们可以将NioEventLoop当做一个不断执行的线程池,EventLoopGroup作为线程池组,线程池组的意义是采用给的的策略选取一个EventLoop并提交任务。

EventLoop的定义如下,其继承了一个顺序执行的线程池接口和EventLoopGroup,也就是说EventLoop之间有父子关系,通过parent();返回任务循环组,通过next()选取一个事件循环。线程池组的register用于将Netty的Channel注册到事件循环中。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

public interface EventLoopGroup extends EventExecutorGroup {
    EventLoop next();
    ChannelFuture register(Channel channel);
}

NioEventLoopGroup

NioEventLoopGroup除了处理网络的异步I/O任务,还用于完成异步提交的系统任务。NioEventLoopGroup初始化时,有如下几个参数可以配置,主要用于设置线程池的相关配置。

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                RejectedExecutionHandlers.reject());
    }

NioEventLoopGroup初始化过程为:

  1. 如果传入的executor 为空,会默认使用ThreadPerTaskExecutor,该线程池针对每个任务会创建一个线程,创建线程方式使用DefaultThreadFactory提供的newThread方法。
  2. 初始化开始,首先会根据创建nThread个子线程池,保存在childrens变量中,创建逻辑比较简单,将初始化NioEventLoopGroup时设置的参数传递给NioEventLoop对象。在创建子线程池NioEventLoop的过程中,如果一旦有失败的,就需要关闭已经创建的所有子线程池并等待这些线程池结束。
  3. 之后,使用chooserFactory创建chooser,用来在next()选择事件循环时从childrens变量选择一个返回。默认使用2的倍数的策略,也可以设置为顺序依次选择。
  4. 向组中所有的事件循环的terminationFuture注册事件,目的是等待所有事件循环结束后将事件循环组的terminatedChildren设置为成功完成。
  5. 最后,将children复制保存为一个只读的集合,保存在变量readonlyChildren中。

至此,NioEventLoopGroup的初始化过程就结束了。我们可以看到,NioEventLoopGroup主要的用来聚合多个EventLoop,对其进行调度。

NioEventLoop

在NioEventLoopGroup的初始化过程中,会创建多个NioEventLoop,NioEventLoop用来执行实际的事件循环,初始化时有如下几个属性:

事件循环

提交任务

NioEventLoop初始化时,会创建/设置其包含的属性,最重要的是打开selector和创建tailTasks两个步骤;这时,由于没有任何任务,NioEventLoop不会启动线程。在netty中,向线程池提交任务可以使用下面的方法:

EventLoopGroup loop = new NioEventLoopGroup();
loop.next().submit(Callable<T> task)
loop.next().submit(Runnable task)
loop.next().execute(Runnable command);

也可以直接通过EventLoopGroup提交任务,只是EventLoopGroup内部会调用next()后再执行相关的方法。

EventLoopGroup loop = new NioEventLoopGroup();
loop.submit(Callable<T> task)
loop.submit(Runnable task)
loop.execute(Runnable command);

submit方法的内部会将Callable或Runnable包装后交给execute方法执行。

// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task); // 包装task为 ftask
        execute(ftask);
        return ftask;
    }

execute方法被NioEventLoop的父类SingleThreadEventExecutor覆盖,程序如下:

public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task); // 添加到任务队列
        } else {
            startThread(); // 启动线程,向EventLoop内部的线程池提交任务,会执行NioEventLoop run
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
  1. 判断当前线程(提交任务的线程)与当前线程池是同一个线程,也就是说是如果是当前线程池提交的任务,则直接将任务加入线程池队列即可;
  2. 如果不是,则需要启动线程后添加任务。启动线程的过程是,如果内部线程没有启动则启动,向NioEventLoop内部包含的executor提交一个任务,任务内部执行NioEventLoop的run方法也就是事件循环(executor是实际使用的线程池,初始化是传入,默认是ThreadPerTaskExecutor)。
  3. 最后根据addTaskWakesUp标志和任务是否实现了NonWakeupRunnable判断是否需要唤醒,唤醒的方法是提交一个默认的空任务WAKEUP_TASK。

3.4 事件循环解析

Nio事件循环在NioEventLoop中,主要功能:

在主循环中我们可以看到netty对I/O任务和提交到事件循环中的系统任务的调度。


EventLoop事件循环

3.4.1 I/O事件

  1. 由于NIO的I/O读写需要使用选择符,因此,netty在NioEventLoop初始化时,会使用SelectorProvider打开selector。在类加载时,netty会从系统设置中读取相关配置参数:
static {
        int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
        if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
            selectorAutoRebuildThreshold = 0;
        }
        SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
    }
  1. NioEventLoop的构造方法中,会调用provider.openSelector()打开Selector;如果设置io.netty.noKeySetOptimization为true,则会启动优化,优化内容是将Selector的selectedKeys和publicSelectedKeys属性设置为可写并替换为Netty实现的集合以提供效率。
private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
       //  下面是优化程序,此处省略
       ...
        return selector;
    }
  1. NioEventLoop最核心的地方在于事件循环,具体代码在NioEventLoop.java在run方法中
 protected void run() {
        for (;;) {  // 事件循环
            try {
                // select策略
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));  // select()
                        if (wakenUp.get()) {
                            selector.wakeup(); // 唤醒select()的线程
                        }
                    default:
                        // fallthrough
                }
            .... 后续处理
if (ioRatio == 100) {
    try {
            processSelectedKeys();
        } finally {
            runAllTasks();
       }
} else {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
      } finally {
        final long ioTime = System.nanoTime() - ioStartTime; // io花费的时间
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate计算task的时间
}
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
       ......
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              
                ch.unsafe().forceFlush();
            }
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

3.4.2 任务处理

protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            fetchedAll = fetchFromScheduledTaskQueue(); // 获取定时任务
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }
上一篇下一篇

猜你喜欢

热点阅读