Java源码分析----Future

2018-12-28  本文已影响11人  _六道木

一般使用多线程操作的时候会使用Thread+Runnable进行处理,但是这种方式中,Runnable是没有返回值的,假设我们需要获取Runnable的返回值,可能需要如下特殊处理,伪代码如下

String returnValue1 = "";
String returnValue2 = "";
CountDownLatch cdl = ....
new Thread(()->{
    // xxxx操作
    returnValue1 = "返回值";
    cdl.countDown
});
new Thread(()->{
    // xxxx操作
    returnValue2 = "返回值";
    cdl.countDown
});
cdl wait// 程序阻塞在这

print returnValue1
print returnValue2

当Runnable运行完并且赋值完毕则通知CDL,最后主线程在wait处等待两个线程执行完毕,然后获取Runnable的返回值。
这样的做法比较麻烦,而JDK提供了一个叫做Future的东西,他实现了上述的功能,且使用上更加的简便,看下例子

ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<String> future = executorService.submit(() -> {
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "xxxx";
        });

        try {
            String value = future.get();
            System.out.println(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

操作上更加的简便,调用Future的get方法的时候,就类似cdl wait+获取returnValue1

submit方法

那么下面看下其中是如何实现的,先看下submit方法,实现在java.util.concurrent.AbstractExecutorService中

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 将Callable转换成RunnableFuture,它继承了Runnable和Future
        RunnableFuture<T> ftask = newTaskFor(task);
        // 线程池的execute方法,参数类型为Runnable
        execute(ftask);
        return ftask;
    }

从submit方法可以看出,submit也是执行的execute方法,虽然参数不一样,但是其中Callable转换成Runnable,即RunnableFuture的实现,并将其返回,也就是上述例子中的Future。

这里和第一个例子做类比,这里返回的Future共包含几个功能,简化了使用

newTaskFor

newTaskFor方法如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    public FutureTask(Runnable runnable, V result) {
        // runnable封装成callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

返回一个FutureTask,即RunnableFuture的实现类,也是一个Runnable的子类,当调用execute方法的时候,就会执行FutureTask的run方法,这里先不看run方法实现,先看下FutureTask的内部属性

     /**
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    // 当前Future的状态值,加了volatile修改,则代表一个线程改变后,马上对另外线程可见
    private volatile int state;
    private static final int NEW          = 0;// 初始状态
    private static final int COMPLETING   = 1;// 执行完毕,还未进行赋值
    private static final int NORMAL       = 2;// 最终完成状态
    private static final int EXCEPTIONAL  = 3;// 出现异常
    private static final int CANCELLED    = 4;// 取消状态
    private static final int INTERRUPTING = 5;// 正被中断
    private static final int INTERRUPTED  = 6;// 被中断

    private Callable<V> callable;
    private Object outcome; // 返回值,不一定是Callable的返回值,出现异常的时候放的是异常对象
    private volatile Thread runner;// 执行run方法的线程
    private volatile WaitNode waiters;//阻塞等待的节点

run方法

这时候再看下run方法

    public void run() {
        // 状态不为初始化值,证明已经执行过,直接返回
        // 状态为NEW,则将runner设置为当前线程,如果失败,证明别的线程已经在操作,直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            // 状态为初始化状态才执行,因为有可能被中断或者调用cancel取消
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行用户定义的call方法,并取的返回值
                    result = c.call();
                    // 执行成功
                    ran = true;
                } catch (Throwable ex) {
                    // 有异常出现,返回值置空,设置成执行失败
                    result = null;
                    ran = false;
                    // 设置状态,和设置返回值为异常对象
                    setException(ex);
                }
                if (ran)// 执行成功,设置返回值
                    set(result);
            }
        } finally {
            // runner在状态值改变之后才能设置为空,否则可能出现多个线程执行run的情况
            runner = null;
            // 中断处理
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

setException和set方法

其中主要看setException和set方法

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

调用了set或者setException方法,状态会变成COMPLETING,执行完成后,会将状态设置为NORMAL或者EXCEPTIONAL,而成功时outcome是正常返回值,失败则是Throwable对象。

finishCompletion方法

最后,调用finishCompletion将阻塞的线程唤醒,遍历waiters,调用unpark唤醒线程,处理和AQS有相似之处

 private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }

get方法

get方法有两种形式,一种是程序会一直等待结果返回,而另外一种是有等待时间的,当时间到了之后,还未返回,则会抛出异常

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 还未完成,则进入awaitDone,阻塞线程
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 完成之后调用方法获取返回值
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 还未完成,则进入awaitDone,阻塞线程
        // 时间到了之后会返回状态值s,如果此时还未完成,那么抛出异常
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

可以看到两种方法实现是类似的,但是有时间参数的方法,在指定时间内无法获得返回值的话是会抛出异常的,这点在使用的时候需要注意
awaitDone方法会在状态为未完成状态下阻塞线程,当完成或者指定时间到达的时候返回当前状态,此时有两种情况

接下来看下awaitDone方法实现

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 是否有指定时间,如果没有则为0
        // 如果有,则deadLine为当前时间+超时时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;// 表示当前线程节点
        boolean queued = false;// 是否在排队
        for (;;) {
            if (Thread.interrupted()) {// 当前线程被中断
                removeWaiter(q);// 从链表中移除当前线程节点
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {// 正常完成状态,直接返回
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 还在执行当中,但是从上可以知道,这种状态很快变化为最终状态
            // 所以这里使用yield而不是阻塞可能就是这个原因吧
            else if (s == COMPLETING) 
                Thread.yield();
            else if (q == null)
                // 状态为NEW且为第一次循环,则代表还未结束处理,则先构造一个线程节点
                q = new WaitNode();
            else if (!queued)
                // 在上一个分支判断后,进入下一次循环,如果还是NEW的状态
                // 则将线程节点挂在链表头部
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                // 如果有时间参数,则阻塞特定时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    // 减少得到deadLine已经到了,则将该节点移除,并返回当前状态
                    removeWaiter(q);
                    return state;
                }
                // 阻塞特定时间
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 没有时间参数的话则一直阻塞知道整个任务完成
                LockSupport.park(this);
        }
    }

这里逻辑不太难懂,总结一下分支:

从上可以知道,一个线程调用get方法后,最多会执行4次循环:

  1. 创建线程节点
  2. 节点加入链表
  3. 阻塞
  4. 任务完成唤醒后再进行一次判断并返回

可以看到这里的循环次数还是比较多的,相当于先来几次自旋再阻塞。

cancel方法

Future还提供了取消任务的入口,即cancel方法,内部将对应的线程进行中断,使正在执行的线程退出

    public boolean cancel(boolean mayInterruptIfRunning) {
        // 状态为NEW 
        // 如果mayInterruptIfRunning为true,状态设置成INTERRUPTING状态
        // 否则设置成CANCELLED
        // 如果状态不为NEW或者说状态设置失败了,则返回false
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
            // 如果设置为true,那么取出当前正在执行的线程,并将其中断
            // 最后设置中断完成状态
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 唤醒阻塞线程
            finishCompletion();
        }
        return true;
    }

综合run方法的逻辑,cancel方法不能取消非NEW状态的任务,且只是设置了一个中断位的标志,如果run方法已经执行到判断状态位后的代码准备运行或者已经运行了,那么cancel还是无法终止任务的执行

题外话

Dubbo实现了自己的Future,整体的交互过程其实是类似的,但是逻辑会比JDK自带的Future会简单一点,因为其中没有多个线程对Future进行get的操作,所以从get的性能上讲,Dubbo的会快一点

上一篇 下一篇

猜你喜欢

热点阅读