JUC并发相关

24. 并发终结之FutureTask

2020-09-24  本文已影响0人  涣涣虚心0215
关于Thread和Runnable的区别:

1)从对象编程来说:Thread是继承;Runnable是组合,会比继承耦合性低,更加灵活。
2)从对象共享角度:Runnable实例可以由过个线程实例共享,会产生并发问题。
3)对象创建成本:Thread在创建的时候JVM就会为其分配调用栈空间,内核线程等资源;而Runnable是普通的类,作为参数传给Thread,所以Runnable创建成本相对较低。

关于用户线程(User)和守护线程(Daemon):

用户线程会阻止JVM正常停止;
守护线程不会影响JVM正常停止,所以守护线程通常用于执行一些重要性不高的任务。

关于Callable和Runnable的区别:

Callable方式需要FutureTask实现类的支持,用于接收运算结果。
FutureTask是Future接口的实现类,且FutureTask也可用于闭锁的操作,因为get() 会阻塞当前线程直到Callable返回结果。

FutureTask

FutureTask提供了支持cancel的异步计算方式,它实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable和Future接口。
FutureTask可以用来包装一个Callable和Runnable对象,且因为实现了Runnable,所以FutureTask可以被提交到线程池处理。

源码分析

因为FutureTask实现了Future接口,所以它会实现Future接口的相关方法,比如说get(), cancel()等等。

成员变量以及构造函数

有三个volatile成员变量,会通过UNSAFE类来进行CAS操作。
另外定义了state变量对应的7中状态。
callable变量意味着FutureTask会将Runnable也封装成Callable来处理。
outcome则是返回这或者异常的信息

private volatile int state;
private static final int NEW          = 0;//初始是NEW
private static final int COMPLETING   = 1;//在set()和setException()里面先CAS更新成COMPLETING,加锁操作
private static final int NORMAL       = 2;//set()方法执行成功从COMPLETING到NORMAL
private static final int EXCEPTIONAL  = 3;//setException()方法执行成功从COMPLETING到EXCEPTIONAL
private static final int CANCELLED    = 4;//cancel(false)的时候
private static final int INTERRUPTING = 5;//cancel(true)的时候
private static final int INTERRUPTED  = 6;//cancel(true)最终状态

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}
构造函数

FutureTask内部有Callable类型的成员变量,所以Runnable会通过一个适配器RunnableAdapter,转换成Callable,内部还是调用的Runnable的run()方法。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
//接受Runnable的构造函数
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
//适配器将Runnable转换成Callable
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
//Adapter类,Callable的call方法调用Runnable的run方法。
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
run()

run()方法主要是调用Callable的call方法,如果有异常则通过setException(Throwable t)设置异常;如果没有异常,则通过set(V result)方法,来设置返回值。且这两个set方法最终都会调用finishCompletion()来unpark唤醒WaitNode节点里的等待线程去获得结果。
run()方法涉及到的方法有setException(), set(), finishCompletion(), handlePossibleCancellationInterrupt()。
涉及到的state有COMPLETING, EXCEPTIONAL, NORMAL
且在call()方法返回之前,都是NEW状态,这样cancel()才会有机会。
且run方法最好检查INTERRUPTING状态,会在handlePossibleCancellationInterrupt自旋直到cancel()方法结束,state变成INTERRUPTED状态(下面代码里分析)。

public void run() {
    //如果state不是NEW或者CAS更新runner成员变量失败,则直接return。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //直接调用Callable的call方法并拿到result
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                //如果call()方法抛出异常,ran=false
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
//如果抛出异常,则调用该方法,更新state状态到COMPLETING,
//将Throwable赋值给outcome变量,再更新state到EXCEPTIONAL.
//最后finishCompletion()唤醒WaitNode中等待线程节点
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        //因为上面更新到COMPLETING成功,这边更新EXCEPTIONAL则不需要CAS操作,而是putOrderedInt内存操作
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //先通过CAS获得更新资格,将waiter变量更新为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    //拿到q对应的线程,更新为null,并唤醒该线程
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                //然后操作q的后继节点
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    //done()方法由子类实现
    done();

    callable = null;        // to reduce footprint
}
//如果没有异常,更新state到COMPLETING先,然后将result设置到outcome,更新state到NORMAL
//最后finishCompletion唤醒等待线程
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
//final的时候会检查一下state是不是被中断,如果一直在中断过程中,则当前线程yield让出CPU
//这边自旋等待调用中断的线程执行完毕,因为此时run方法已经结束,runner也被重置为null
//   所以别的线程可能有机会提交这个FutureTask,而执行run方法,这时候cancel可能被应用到不同的Task上。
//   所以这里要自旋直到cancel()方法执行结束
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}
cancel()

cancel的时候有几种情况:
1.任务还没开始,直接返回false
2.任务已经开始:
2.1调用cancel(false),就是最后调用finishCompletion()来唤醒等待线程
2.2调用cancel(true),则通过runner获得当前运行线程,调用运行线程的Interrupt()方法来设置目标线程的中断标记位为true
这里涉及到的state状态有INTERRUPTING, CANCELLED, INTERRUPTED

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          //如果参数是true,则更新为INTERRUPTING状态,否则CANCELLED状态
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();//通过设置中断标志位来控制目标线程的生命周期
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
get()

如果state的状态还没到COMPLETING,就在awaitDone()里面通过LockSupport.park()挂起当前线程,直到run()方法执行结束唤醒等待线程。
异常:
1.get()方法响应异常中断,会抛出InterruptedException。
2.且如果是get(long timeout, TimeUnit unit)方法,还会抛出TimeoutException。
3.另外如果Callable的call方法没有捕捉异常而抛出异常,则get()方法会抛出ExecutionException。FutureTask的run()方法在执行Callable的call()方法时,会将call抛出的异常捕获包装成ExecutionException,这意味着call()方法出现异常,不会直接导致执行线程运行结束(相比正常的Thread的run()方法抛出异常可能导致执行线程提前终止生命周期)。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //如果state还没到COMPLETING,则开始挂起当前线程
        s = awaitDone(false, 0L);
    //run()方法结束之后会唤醒当前线程,去拿到执行结果。
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //判断当前线程是否被中断
        if (Thread.interrupted()) {
            //如果被中断,则需要把waitNode的节点都垃圾回收掉,抛出InterruptedException
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //如果已经结束,则将waitNode的thread设置为null,并返回state值
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //COMPLETING表示set开始,快结束了,则让执行线程让出CPU等待一下
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //这里s就还没到COMPLETING,如果waitNode是null,则创建WaitNode
        else if (q == null)
            q = new WaitNode();
        //如果q不为null,则需要添加到WaitNode的next后继节点
        else if (!queued)
            //这个UNSAFE操作,先更新q的next=waiter,再将qCAS更新到waiters变量
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //如果是有等待时间的,则在等待时间过后,removeWaiter()来处理等待的node
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}
//处理最终结果
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        //如果state是NORMAL,则直接返回outcome值
        return (V)x;
    if (s >= CANCELLED)
        //则就是cancel()使得线程被Interrupt了或者cancel了,抛出CancellationException
        throw new CancellationException();
    //否则就是抛出异常了
    throw new ExecutionException((Throwable)x);
}

因为是无线循环,所以里面的q = new WaitNode()和UNSAFE更新操作在park之前都会做一边,即park之前会将当前线程封装到WaitNode链表里。

上一篇下一篇

猜你喜欢

热点阅读