Java并发编程Jvm

Java并发编程 - ThreadPoolExecutor之Wo

2019-03-20  本文已影响12人  HRocky

初识Worker

线程池顾名思义是存放线程的池子,ThreadPoolExecutor是语言级别上对它的定义,既然要存放线程,那么其内部就需要某种数据结构来存储代表线程的Thread对象。Java API定义的容器类可以用来完成存储的功能,由于线程集中线程的松散性和不可重复性的特点,选用Set来存储比较合适。

那么,就可以在ThreadPoolExecutor类中这样定义:

private final HashSet<Thread> threads = new HashSet<Thread>();

存储的问题我们解决了,那么下一个问题就是线程执行用户任务的问题,虽然现在还没提到任务队列,不过就当成我们已经知道了,需要执行的用户任务会被放入到ThreadPoolExecutor内部定义的任务队列里,线程池的线程会从这个任务队列里取出用户任务来执行。

ThreadPoolExecutor-Worker-1.png

线程池中的线程要取用户任务并执行,那么线程池中的线程类要这样定义:

public class ThreadPoolThread extends Thread {
    @Override
    public void run() {
        // 获取队列中的用户任务
        Runnable userTask =  getUserTaskFromQueue();  
        userTask.run();
    }
}

我们基于Runnable接口编程,"获取队列中的用户任务并执行"的逻辑代码可以剥离出来,定义成一个新的任务类:

public class ThreadTask implements Runnable {
    @Override
    public void run() {
        // 获取并执行队列中的任务
        Runnable userTask =  getUserTashFromQueue();
        userTask.run();
    }
}

这样定义后,创建线程并放到线程池中,就可以这样做:

ThreadTask threadTask = new ThreadTask();
Thread thread = new Thread(threadTask);
thread.start();
threads.put(thread);

线程池中的线程需要类似上面代码那样创建后放入到线程池中的,那么上面的这段代码什么时候执行?

有两种方案:一种是在线程池启动的时候我们就创建一定数量的线程,也就是所谓的"预先启动线程",这种方案的策略是不管有没有任务需要执行,预先启动一定数量的线程然后等着任务到来;还有一种方案就是提交任务的时候再创建线程,只要数量不达到一个预定的值即可。提交任务的时候再创建线程,那么想一下,这个任务有必要放到任务队列中吗?用刚创建的线程来执行这个任务不就行了嘛?是的,的确是这样。回头看一下上面创建线程的代码,也就是说:

Thread thread = new Thread(threadTask);

这里操作的时候就希望把用户任务设置进去,但是threadTask本身就是一个任务对象,没有方法设置两个任务对象。那就需要对ThreadTask改造了,改造也很简单,类似这样:

public class ThreadTask implements Runnable {
    
    Runnalbe userTask;

    public ThreadTask (Runnable userTask) {
        this.userTask = userTask;
    }

    @Override
    public void run() {
        if (userTask != null || (userTask = getUserTaskFromQueue() ) != null)  {
            userTask.run();
        }
    }
  
    public Runnalbe getUserTaskFromQueue() {
        Runnable queueTask =  getUserTashFromQueue();
        return queueTask;
    }

}

上面的代码还有个问题,就是线程需要不断地从队列中取用户任务执行,而我们知道线程执行完run方法后就会停止,所以上面的代码达不到循环取任务的效果,需要对run方法的逻辑进行修改,如下:

while (userTask != null || (userTask = getUserTaskFromQueue() ) != null)  {
     userTask.run();
}

重新定义ThreadTask之后,创建线程池线程的代码,修改成如下所示:

Runnable userTask = new UserTask();
ThreadTask threadTask = new ThreadTask(userTask);
Thread thread = new Thread(threadTask);
threads.put(thread);
thread.start();

ThreadTask需要Thread的调用,也可以说其实就是Thread的run方法中的一部分,因为我们基于Runnable编程所以把它给提取出来了,既然ThreadTask是依托于Thread的,那么创建ThreadTask的时候就把要运行它的线程确定下来可以吗?是可以的,对ThreadTask改造如下:

public class ThreadTask implements Runnable {
    
    Runnalbe userTask;

    Thread thread;

    public ThreadTask (Runnable userTask) {
        this.userTask = userTask;
        thread = new Thread(this);
    }

    @Override
    public void run() {
        while (userTask != null || (userTask = getUserTaskFromQueue() ) != null)  {
            userTask.run();
        }
    }
  
    public Runnalbe getUserTaskFromQueue() {
        Runnable queueTask =  getUserTashFromQueue();
        return queueTask;
    }

}

上面的代码中我们将Thread作为了ThreadTask的一个属性,在ThreadTask的构造方法中通过this将自身对象设置进去。这样改造后,创建线程池线程的代码,修改成如下所示:

Runnable userTask = new UserTask();
ThreadTask threadTask  =  new ThreadTask(userTask);
threadTask.thread.start();// 启动线程

经过上面的一系列改造,ThreadTask具有了获取队列用户任务以及执行用户任务的功能,那么ThreadPoolExecutore只要持有ThreadTask的集合集合,也就是说我们上面的Thread集合可以修改成:

private final HashSet<ThreadTask> threads = new HashSet<ThreadTask>();

内部任务的执行就是通过ThreadTask这个对象的操作来完成,将用户交给它,它会启动内部持有的线程,然后执行用户任务。

有没有觉得ThreadTask这个名字并不完全符合了。这里不用费心思考合适的名字,它就是这篇文章要介绍的类——Worker的简化版。

JDK中Worker类的定义:

ThreadPoolExecutor$Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

中文翻译把它称之为:工作线程或工作者。

上面贴出来的代码可以看到,run方法的执行逻辑交给了外部定义的runWorker方法。

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Worker锁

可以看到上面贴出来的Worker类继承了AQS,重写了锁获取与释放的方法,Worker具有了锁的特性,而且是一把互斥锁。runWorker使用了Worker这把互斥锁,通过它保护用户任务的执行。搜索ThreadPoolExecutor的源码可以发现除了Worker本身的run方法调用了runWorker之外,没有其他地方执行了调用,而run方法的调用本身就是线程封闭式的,不会存在一个线程闯入进来打乱线程的执行。也就是runWorker的执行不存在竞争,为什么要用锁来保护?

目的是中断的控制, 正如Worker类注释锁描述的那样:

Class Worker mainly maintains interrupt control state for threads running tasks...

控制中断?怎么说?

这样说,现在问你,Worker代表工作线程,它正在执行任务,这时候想要中断它怎么做?Woker严格意义来说不是线程对象,但是它的内部包含了线程对象,这个线程对象就是运行Worker称之为工作线程的体现。想中断Worker的操作,那么中断它内部的这个线程就行了,于是我们可以这样操作:

Thread t = worker.thread;
t.interrupt();

通过这样的方式就可以中断Worker的执行。前提是可以持有想中断的线程对象的Worker对象。

但是现在的需求是在Worker工作的时候不允许中断它。也就是在执行runWorker内部逻辑的时候不允许中断正在运行它的线程。这时候runWorker使用的Worker锁就起到作用了。

对某一个线程的中断通常是另外一个线程发起的,也就是

Thread t = worker.thread;
t.interrupt();

这段代码是另外一个线程调用的,于是我们可以这样操作:

if (worker.tryLock()) {
    Thread t = worker.thread;
    t.interrupt();
}

如果worker锁现在被想中断的那个线程持有,那么上面的执行线程想获取锁就会失败,无法执行中断操作。

这样通过Worker锁的控制保护了正在执行的任务不能被中断。

还有一点在Worker的构造还是是这样定义的:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

这里可以看到state的初始值为-1,为什么定义这个数字,其实后面的解释已经很明确了:在进入runWorker方法之前不允许中断。在runWorker进入后用户任务执行之前,做了释放锁的操作,将state的值改为了0,也就是说runWorker锁保护块代码之前可以被中断:

w.unlock(); // allow interrupts

用户任务没执行前被中断ThreadPoolExecutor是允许的。

非重入

看先Worker的tryAcquire方法定义:

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

这里的逻辑规定了不能重复获取锁,这个是为什么呢?

首先要明确一点Worker是来执行用户任务的,用户任务的开始执行的执行点在在runWorker方法中:

task.run();

假设现在执行这个调用的是线程A,这时候线程A是持有Worker锁的,我们在自己调用的这个task的run方法中执行这样的操作:

class MyTask implements Runnable {
    @Override
    public void run() {
        threadPoolExecutor.setCorePoolSize(10);
    }
}

执行run方法的当前线程还是线程A,来看看setCorePoolSize方法:

setCorePoolSize

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

这里有个interruptIdleWorkers方法的调用:

interruptIdleWorkers

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

interruptIdleWorkers

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

现在线程A执行到这里,刚好w就是它在runWorker执行时候持有的锁,

if (!t.isInterrupted() && w.tryLock()) 

如果允许重入,那么w.tryLock就是true,那么线程A就把它自己它打断了。

所以Worker类在设计的时候就设计成是不可重入。

Worker执行异常处理

Worker执行出现异常指的是什么?

还是来看一下runWorker代码:

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

注意第2个try...catch块,task是用户自定的任务,在运行的时候可能会出现异常,出现异常后,从上面代码中可以看到捕获异常后被抛出了,那么下面这一句就无法执行:

completedAbruptly = false;

也就是completedAbruptly为初始值true,while循环结束,执行processWorkerExit方法,现在来看一下processWorkerExit的处理:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

#第1步:减workerCount(减1)

if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

#第2步:将当前Worker完成的任务数加到总任务数中,并从Worker集合中移除当前Worker

completedTaskCount += w.completedTasks;
workers.remove(w);

#第3步:尝试终止线程池

tryTerminate();

在对线程池有负效益的操作时,都需要“尝试终止”线程池。

终止线程池不在这里做说明。

#第4步:如果线程池状态是running或shutdown就尝试增加一个新的Worker

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    ......
    // completedAbruptly为true,上面的代码忽略
    addWorker(null, false);
}

这里调用addWorker方法不一定能增加成功,addWorker内部有具体的判断逻辑。

addWorker逻辑不在这里做说明。

上一篇下一篇

猜你喜欢

热点阅读