多线程Executor

2017-04-21  本文已影响0人  叛逆的青春不回头

一、相关类简单介绍
二、各类之间的关系图
三、 execute方法执行过程


相关类简单介绍

1、Executor:执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法
2、Executors:为创建ExecutorService提供了便捷的工厂方法
3、ExecutorService:提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。其提供两个方法来关闭 ExecutorService:

4、ScheduledExecutorService:ExecutorService的一个重要的子接口,此service是为了支持时间可控的任务执行而设计,其中包括固定延迟执行,周期性执行;不过他还不支持制定特定date执行,这个工作可以交给Timer来做
5、ThreadPoolExecutor:线程池执行器,ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置
6、Runnable、Callable、Future

各类之间的关系图

下图是一张包含各类关系比较全的一张图,可以看出入口主要在execute(Runnable command) 方法:

execute方法执行过程

一个简单例子:

public class ThreadPoolTest {  
    public static void main(String[] args) {  
        ExecutorService exec=Executors.newCachedThreadPool();  
        for(int i=0;i<5;i++)  
            exec.execute(new Runnable());  
        exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  
    }  
} 

看下源码:

/*===ThreadPoolExecutor.java===*/
public void execute(Runnable command) {      //***(1)***
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {   
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);          //***(2)***
    }
    else if (!addWorker(command, false))
        reject(command);
}
  
private boolean addWorker(Runnable firstTask, boolean core) {
    //省略代码..
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);        //***(3)***
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();     //***(5)***
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
 
 
  
/*===ThreadPoolExecutor.Worker.java===*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
 
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);    //***(4)***
    }
   ...
}
  
  
/*===Executors.DefaultThreadFactory.java===*/
static class DefaultThreadFactory implements ThreadFactory {
    ...
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
   ...
}

对应执行execute()方法的完整过程:


从上面过程可以看出,会执行到addWorker()的t.start(),这里的t就是new出的Thread,也就是说t.start()实际上执行的是Worker内部的runWork方法。
runWork()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue中。在上图的绿色框中执行addWork()之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。


上一篇 下一篇

猜你喜欢

热点阅读