程序员Java-多线程Android开发经验谈

Android 多线程之线程池(二)

2020-12-26  本文已影响0人  JingChen_

一 线程池中的一些重要概念

Worker / workers

Worker类是ThreadPoolExecutor类的一个内部类,也是线程池管理操作线程的核心所在。每一个worker都对应着一个thread,所以在不混淆的情况下,可以把worker理解为工作线程。

ThreadPoolExecutor有一个名为workers的成员变量,它保存了这个线程池所有存活的worker对象。

workQueue

workQueue是线程池内部用来保存待执行任务的队列。它是一个BlockingQueue<Runnable>类型的变量,在没有任务的时候,它的poll()方法会阻塞。

在一个允许超时的worker执行完任务之后,会调用workQueue.poll()取出下一个任务执行。如果没有任务,则会在这里阻塞;当阻塞时间达到超时时间后,这个工作线程会退出并销毁。

一些简单的线程池使用方法,可以看看这篇文章Android 多线程之线程池(一)

二 源码分析

ctl

ThreadPoolExecutor通过一个原子整型ctl来保存线程池的两个重要字段,workerCount和runState。workerCount即线程池工作线程的数量,而runState代表了线程池当前的状态(如:运行中、关闭、终止)。通过位运算,可以从ctl得到workerCount和runState的值,反之也可以通过workerCount和runState组合得到ctl。

//ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

execute(runnable)

//ThreadPoolExecutor.java
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

当线程池执行一个任务会调这个方法进来,回想下之前讲过线程池的工作流程是怎么样的Android 多线程之线程池(一),再来看看源码会加深印象。

分析:
1、先判断任务是否为空,如果为空直接抛异常。
2、如果当前运行的线程小于核心线程数,那么就去addWorker(command, true),这个方式返回一个Boolen值,意思是创建一个worker是否成功,第一个参数是任务,第二个参数是创建的worker的线程是否为核心线程。
3、如果当前线程池是运作的,并且这个任务workQueue.offer(command)添加进队列,那么继续往下走,如果添加进队列失败,则去addWorker(command, false)创建一个普通线程并把任务对给他执行,如果创建失败,则拒绝这个任务。
4、如果3的条件重新取到ctl,如果这个线程池不是运作的,并且这个任务已经在任务队列移除了,那么直接拒绝;如果上述条件不成立则判断workerCountOf(recheck) == 0 线程池没有工作线程的时候,则去创建普通线程的worker。


线程池工作原理.png

addWorker(Runnable firstTask, boolean core)

//ThreadPoolExecutor.java
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

先去判断线程池的状态if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))return false;
直接返回false有以下3种情况:
1、线程池状态为STOP、TIDYING、TERMINATED
2、线程池状态不是running状态,并且firstTask不为空
3、线程池状态不是running状态,并且工作队列为空
当返回false时则进入拒绝策略

之后去判断

int wc = workerCountOf(c);
if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

addWorker的core的值,如果是核心线程,当前的workerCountOf大于corePoolSize则返回false;如果是非核心线程,当前的workerCountOf大于maximumPoolSize则返回false,当返回false时则进入拒绝策略

如果上述条件满足,compareAndIncrementWorkerCount(c)表示workerCountOf+1

接下来就是去创建一个worker了,相当于创建一个工作线程,之后把work添加到队列里面workers.add(w);如果workers.size() > largestPoolSize 当前的工作线程(包含未启动)大于最大工作线程,那么最大工作线程设置为队列的工作线程

  if (workerAdded) {
       t.start();
       workerStarted = true;
  }

执行work对象里面的Thread

ThreadPoolExecutor.Worker(Runnable firstTask)、runWorker(Worker)

//ThreadPoolExecutor.java
Worker(Runnable firstTask) {
   setState(-1); // inhibit interrupts until runWorker
   this.firstTask = firstTask;
   this.thread = getThreadFactory().newThread(this);
}

public void run() {
   runWorker(this);
}

刚才有提及到worker可以理解为工作线程,原因是worker.thread是工作线程,而worker.firstTask为该线程处理的任务

当Worker的线程开始运行之后,会调用其run()方法:runWorker(this)

// 省略一大部分
    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } catch (Exception x) {
            } 
        }
    }

    // 省略一大部分
    private Runnable getTask() {
        for (;;) {
            if (/*无法获取任务*/) {
                return null;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
            } catch (InterruptedException retry) {
            }
        }
    }

为了便于观看,只留了核心的几行,如果传入的task不为null,则执行相应的run方法;当传入的task为null,则会从getTask()方法中获取runnable对象

try {
      Runnable r = timed ?
          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
      if (r != null)
          return r;
    } catch (InterruptedException retry) { }

这里面通过timed变量判断是使用poll还是task
poll:如果有值,则立马返回。否则等待一段时间,该时间内阻塞,时间结束后,队列还是没有元素则返回null。
task:如果有值,则立马返回。否则一直阻塞。

什么时候返回null,不让worker继续存活了呢?
1、线程池被shutdown,并且任务队列空了;
2、线程池超容量;
3、超时;

用一张图总结

线程池原理.jpg

End

上一篇下一篇

猜你喜欢

热点阅读