看!源码之Java线程池超详细解读

看!源码之FutureTask超详细解读

2019-05-04  本文已影响0人  starskye

FutureTask

之前讲述了ScheduledThreadPoolExecutor,在他里面用到了FutureTask,这里将对其进行讲述,此类的继承在netty文集中有进行详细讲解,如果不清楚的朋友可以暂时跳转到netty文集,下面开始正文.

public class FutureTask<V> implements RunnableFuture<V> {
    //当前任务的运行状态 volatile修饰了他的可见性
    //因为任务在运行中是使用子线程运行的在其他线程中可以对任务进行取消和中断等操作,所以这里修饰为了再多线程中显示
    private volatile int state;
    //以下是state的几个值
    //任务初始是new
    private static final int NEW          = 0;
    //任务执行完成
    private static final int COMPLETING   = 1;
    //任务正常
    private static final int NORMAL       = 2;
    //任务执行时发生异常
    private static final int EXCEPTIONAL  = 3;
    //任务取消
    private static final int CANCELLED    = 4;
    //任务中断中
    private static final int INTERRUPTING = 5;
    //任务中断
    private static final int INTERRUPTED  = 6;
    //下面是任务状态的转换顺序
    /*
    NEW -> COMPLETING -> NORMAL
    NEW -> COMPLETING -> EXCEPTIONAL
    NEW -> CANCELLED
    NEW -> INTERRUPTING -> INTERRUPTED
    */
    //最终执行的回调方法此接口内部只有一个call方法并且可以获取执行的返回值
    private Callable<V> callable;
    //任务执行的返回值,也就是callable执行的结果
    //此结果并没有可见性修饰是因为对他的操作都是原子操作
    private Object outcome; 
    //此线程用于运行callable
    private volatile Thread runner;
    //等待任务执行结果节点
    private volatile WaitNode waiters;
    //获取最终的执行结果,可以看出是private修饰的是它内部使用的方法
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        //获取callable的返回结果
        Object x = outcome;
        //如果传入的状态是正常结束则返回获取的结果
        if (s == NORMAL)
            return (V)x;
        //如果获取的结果是取消或者中断则抛出取消异常
        if (s >= CANCELLED)
            throw new CancellationException();
        //可以通过上方的结果进行排除还有异常状态、初始状态、执行完成状态
        //但是这里只处理了异常状态并且包装成ExecutionException
        //因为其与两个状态并不会进入此方法,在下面会讲述
        throw new ExecutionException((Throwable)x);
    }

    //构造函数传入的Callable的实现对象
    //并且初始化状态为new
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 这里操作state将会叫所有操作此对象的线程得知state状态初始化为了new,也是volatile修饰的原因
    }
    //第二个构造器,支持传入runnable实现,但是任务执行的试callable所以需要第二个参数传入runnable的返回值
    public FutureTask(Runnable runnable, V result) {
        //通过传入的两个参数调用了Excetors用来创建callable实现
        //返回时RunnableAdapter的实例,此对象非常简单就是调用runnable的run方法并且返回传入result
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }
    //判断任务是否取消,中断也是取消所以此处大于等于取消
    public boolean isCancelled() {
        return state >= CANCELLED;
    }
    //判断是否完成,只要不是NEW都算完成,所以此方法判断是否执行完毕得到的结果是错误的。
    //因为运行中状态并不能算是完成因为并没有确切的结果
    public boolean isDone() {
        return state != NEW;
    }
    //取消执行任务
    //mayInterruptIfRunning 参数比较有含义,具体含义需要根据代码讲解
    public boolean cancel(boolean mayInterruptIfRunning) {
        //进入此方法先判断state==new 当前状态是new并且对state进行原子操作。
        //UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
        //传入当前对象并且传入state在当前对象中的内存偏移传入预计的值代表我认为当前state值是new,然后判断mayInterruptIfRunning如果为true则代表终端中,否则是取消状态
        //在最外层有个取反动作,代表如果当前任务状态是new并且成功设置为了中断中或者取消状态则执行下面的语句进行后续处理,如果当前状态已经不是new则返回false代表取消失败
        //这里有点绕,可以理解为如果当前状态是NEW并没有运行则代表取消失败如果当前状态是其他状态则会继续向下运行。
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
            //进入此处需要注意的是当前任务已经在运行中
            //而mayInterruptIfRunning变量继续发挥作用如果是true
            //上面代码也看到是true则设置为中断中状态所以此处获取当前任务的执行线程并且调用中断
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    //最终结果一定是一个具体的结果,而上方设置为了中断中,所以在线程调用中断后再次设置状态为中断
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //当中断操作完成则进行完成操作
            finishCompletion();
        }
        // 这里做个小结: 此方法取消状态为false 则代表此方法并未运行,为true则代表此方法运行中已经取消,而mayInterruptIfRunning参数则代表是立即中断线程取消,还是等待线程正常执行完,不管哪一种最终的状态都是取消
        //到此处则代表取消成功
        return true;
    }
    //获取任务执行结果
    //抛出了两个异常 线程中断异常,肯定是调用了上方的取消或者线程被意外干掉,执行异常,是FutureTask的结果异常代表在任务运行期间发生了异常,之前在将report方法的时候又看到抛出了CancellationException异常,这里并没有抛出,因为此异常是运行时异常
    public V get() throws InterruptedException, ExecutionException {
        //获取当前任务运行状态
        int s = state;
        //如果任务是未运行NEW或者执行完成COMPLETING状态则
        if (s <= COMPLETING)
            //当前线程等待完成传入参数后面讲解等待的时候会进行说明
            s = awaitDone(false, 0L);
        //不管是否执行完最终都会得到结果,如果执行完则直接返回结果
        //如果未执行完则等待执行直到结束所以s永远都会有值
        //而此方法也可以叫做死等待,他会一直等待直到结束所以此方法阻塞
        return report(s);
    }
    //此方法是上方方法的重构,传入两个参数1、超时时间类型long2、传入这串时间的时间类型是秒还是毫秒或者说纳秒
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        //不管传入的单位是什么最终他会以纳秒进行判断
        //如果当前状态是未运行或者运行中状态则等待与方法调用不同的是传入了true,并且传入了超时的纳秒数,如果等到超时则抛出超时异常
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    //丢给子类的钩子在任务完成的时候进行调用的
    protected void done() { }
    //此处是设置具体的执行结果
    protected void set(V v) {
        //之前说过执行结果是原子操作,所以此处使用了原子操作进行对结果操作
        //但是并不是直接原子操作操作执行结果而且使用了状态值
        //如果当前状态是NEW则修改为执行完成,修改失败则代表此任务已经被别的线程干预了取消或者中断
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //到此处说明并没有其他线程干预结果从而设置当前值为传入的v
            outcome = v;
            //此处设置当前状态为正常,并且防止执行为重排序
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);//当到了最终状态则调用最后的处理方法
            finishCompletion();
        }
    }  
    //设置当前任务结果为异常,操作与上方设置值一样只不过最终设置结果不是正常而是异常
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    //任务的执行方法
    public void run() {
        //如果当前任务不是NEW代表已经有人在执行了所以直接return
        //但是只根据这个状态判断是不准确的再具体完成时一直是NEW所以在此设置当前的运行线程为当前线程先判断当前任务的运行线程是否为null如果是则设置为当前线程,如果设置失败则代表runner已经有值了所以也是reutrn
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            //设置成功则获取当前的callabel
            Callable<V> c = callable;
            //如果当前任务不是null并且当前状态是NEW
            //这是为了防止在这一段代码执行时其他线程进行取消操作
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //因为进入这里的都是执行线程了所以直接调用call方法
                    //这里可以看到使用了try捕获执行异常如果正常则设置ran为true
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //如果发生了异常则设置当前值为执行一次并且设置ran为false
                    result = null;
                    ran = false;
                    setException(ex);
                }
                //如果ran则代表正常执行完成则设置获取到的结果
                if (ran)
                    set(result);
            }
        } finally {
            //到此处runner设置为null,runner最终的用处就是确保run方法在并发下只执行一次,而之所以在这设置为null因为结果和最终状态已经确定了
            runner = null;
            //获取当前状态,如果在运行到此处被中断或者去取消则进行之后的处理
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    //此方法与run方法最大的区别在于run方法只执行一次,而此方法会将当前任务重置可以再次执行,用于周期任务的执行
    protected boolean runAndReset() {
        //同样使用当前状态和当前执行线程用来锁住方法在同一次执行只执行一次,因为可能多个线程同时执行此方法
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        //获取执行结果和当前任务的状态
        boolean ran = false;
        int s = state;
        try {
            //获取当前需要执行的具体任务callable
            Callable<V> c = callable;
            //任务不是null并且当前状态是NEW则调用call方法并且设置允运行结果状态为true,运行失败则设置结果内容为执行异常
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            //与run方法一致
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        //因为此处没有设置结果为正常 所以当正常执行完成状态应该还是NEW
        //如果不是NEW则代表运行时发生可异常从而运行失败返回false
        return ran && s == NEW;
    }
    //所谓的处理中断就是等待取消方法的中断,因为在讲取消方法的时候先会将当前状态设置为INTERRUPTING,然后进过一系列处理再将它设置为INTERRUPTED(这里是指mayInterruptIfRunning为true如果为false则此方法不会有执行操作因为只有s为INTERRUPTING的时候才会进入while)
    //而由于cpu执行速度非常快并不一定执行到这里会将cancel方法执行完。
    private void handlePossibleCancellationInterrupt(int s) {
        //所以此处判断当前状态如果是INTERRUPTING则告诉cpu我没有什么可以执行了放弃当前占用的cpu资源,如果cpu忽略你的放弃或者并没有其他资源用到cpu从而会是死循环不听的说我放弃cpu资源
        //直到状态修改为最终的INTERRUPTED此方法结束
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); 
    }
    //等待节点,在获取结果数据的时候可能会有很多线程等待获取从而不确定数量所以java采用了链表作为等待返回的唤醒,在讲解创建的时候将会详细介绍
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

    //之前在设置结果的时候不管是正常结束还是异常结束设置结果或者是    取消都使用到了此方法
    //而此方法的目的就是释放等待结果的线程,在get中有awateDone方法用于当前获取结果的线程等待执行结果,而此方法就是对线程的唤醒
    private void finishCompletion() {
        // assert state > COMPLETING;
        //声明q临时变量,并且将当前的等待链表赋值给他进行for遍历条件是q不等null等于null说明链表遍历完毕
        for (WaitNode q; (q = waiters) != null;) {
            //赋值过后紧接着给当前的等待列表赋值为null,可以看出采用了cas比较并且交换如果在赋值后等待列表发生了改变那么将会使此处比较失败从而不进if继续for循环进行复制操作。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                //如果比较成功并且设置为null则代表再次之间等待链表并没有发生改变,从而进入这个死循环
                for (;;) {
                    //获取当前节点的线程
                    Thread t = q.thread;
                    if (t != null) {
                        //不等于null则唤醒当前线程
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    //然后获取当前节点的next
                    WaitNode next = q.next;
                    //如果等于null则跳出死循环
                    if (next == null)
                        break;
                    //并且将q的next赋值为null
                    //此处是为了方便GC回收从而断开q.next的引用
                    q.next = null; 
                    //将next节点赋值给q
                    //直到next==null的时候跳出循环,再次查看当前的等待链表是否又有新的等待线程如果有则继续重复否则此方法结束
                    q = next;
                }
                break;
            }
        }
        //调用完成方法,此方法在这里是空的如果有特殊需要实现子类继承即可
        done();
        //将当前的任务设置为null,以便减少对内存的引用这样方便回收
        callable = null; 
    }
    //此方法和上方方法对应,此方法是用来给链表里添加等待线程的
    //如果有线程在任务未执行完的时候调用了get方法将会进入此方法等待
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        //获取他的死线值,死线值意思是死等结果的时间
        //如果设置了等待时间则timed是true则用当前时间的纳秒值加上传入的纳秒值获取等待的最终时间如果是timed是false则返回0L
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        //创建一个链表节点
        WaitNode q = null;
        //是否已经加入到当前队列
        boolean queued = false;
        //进入死循环进行操作
        for (;;) {
            //如果当前线程已经被中断
            if (Thread.interrupted()) {
                //则从等待列表中移除并且抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }
            //获取当前状态
            int s = state;
            //如果大于COMPLETING运行完成状态则代表已经有具体的结果值了从而清空当前节点中的线程引用方便GC回收
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //如果当前状态是运行完成则告诉系统当前cpu资源可以不用了,因为下面执行都是对线程等待的操作,既然已经知道结果了所以就跳过等待的操作从而进入上一个if直接返回结果
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //走到这里状态肯定是NEW等待执行或者执行中从而创建一个节点new 此节点的构造自动赋值thread为当前线程
            else if (q == null)
                q = new WaitNode();
            //到此处则判断是否加入队列而这里的queued代表是否已经加入队列
            //false代表并为加入所以取反进入if
            else if (!queued)
                //因为说是链表所以他的next属性指向原先的列表的头从而成为新的头部
                //然后比较当前运行时间点的等待列表是否进行了更新如果更新则继续循环判断上方的if否则将当前的等待列表进行更新
                queued = UNSAFE.compareAndSwapObject(
                    this, waitersOffset,q.next = waiters, q);
            //如果到此说明已经加入到了等待列表并且当前等待是有设置超时的
            else if (timed) {
                //从而判断在循环中循环的时间是否达到了用户设置的超时时间
                nanos = deadline - System.nanoTime();
                //如果达到则是小于等于0 
                if (nanos <= 0L) {
                    //从而删除节点并且返回当前的状态,因为等待时间已经超过用户的设置时间
                    removeWaiter(q);
                    return state;
                }
                //否则则用互斥锁将当前的线程进行阻塞并且限制时间
                LockSupport.parkNanos(this, nanos);
            }
            else
                //如果没有设置时间则永久等待直到唤醒
                LockSupport.park(this);
            //这里要注意park、parkNanos两个方法是阻塞代表如果唤醒将还会在此处代码,说明唤醒后还在此处死循环这样再次经过上方的判断从而得到结果,当然写入等待节点只会执行一次
        }
    }
    //删除等待节点
    private void removeWaiter(WaitNode node) {
        //当前传入节点不是null
        if (node != null) {
            //设置当前节点的线程引用null
            node.thread = null;
            //设置死循环的别名叫retry
            retry:
            for (;;) {
                //声明三个变量pred q s,这三个变量都是WaitNode类型
                //pred 代表头node
                //q 当前链表头
                //s 当前链表头的next
                //条件是q != null
                //而操作是q = s代表每次循环q都已经修改为q.next
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    //给s赋值当前的链表头
                    s = q.next;
                    //如果q的线程不是null则将q设置为链表头
                    //如果等于null则代表此节点需要删除因为在进入删除节点方法的时候就设置节点线程值为null了
                    if (q.thread != null)
                        pred = q;
                    //如果头节点不是null则代表删除节点已经不是头节点了所以讲头结点的next设置为之前链表头的next节点
                    else if (pred != null) {
                        pred.next = s;
                        //进行检查是否当前节点被移除 如果是则重新进入死循环
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    //如果第一个节点就是q则通过cas进行移除第一个节点,否则通过上方逻辑进行移除
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
            //此方法运行案例
            //链表值[1,2,3,4,5]
            //如果要移除3
            //第一次循环值为 pred = 1 q = 1 s = 2
            //第二次循环值为 pred = 2 q = 2 s = 3
            //第三次循环值为 pred = 3 q = 2 q.next = 4 此处链表结果为:[1,2,4,5]
            //第四次循环值为 pred = 4 q = 4 s = 5
            //第五次循环置为 pred = 5 q = 5 s = null从而跳出循环进入外层死循环
            //如果要移除1
            //第一次循环   pred = null q = 1 s = 2 通过cas替换当前等待链表的头从而结果为[2,3,4,5]
        }
    }

    // 文章中有很多cas操作而cas操作需要使用到内存中的地址偏移值而此处则是通过静态代码块获取三个属性的偏移值一遍操作的时候使用。
    //因为类的成员偏移是不会变的不管你是否赋值都会有哪一段空间所以此处使用静态获取并且操作的时候传入了this进行了优化否则个对象都需要获取偏移。
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

上一篇下一篇

猜你喜欢

热点阅读