Java 杂谈多线程技术分享

Java 多线程之 FutureTask 源码剖析

2019-10-15  本文已影响0人  爱打乒乓的程序员

系统通过多线程优化性能,实际上就是将串行操作转换为并行操作,也就是说将同步操作转换为异步操作。在众多并发类中,FutureTask 类可以接收线程返回的结果,并且可以取消或者中断线程。

先看下 FutureTask 类的类图结构:


由类图可以知道,FutureTask 类是 Runnable 的实现类,所以可以通过线程池 submit() 或者直接 new Thread() 启动线程。

举个栗子:

public class FutureTaskDemo {
    public static void main(String[] args) {
        MyCallableDemo demo = new MyCallableDemo();
//        // 1.直接通过new Thread()启动线程
//        FutureTask task = new FutureTask(demo);
//        new Thread(task).start();
//        try {
//            System.out.println(task.get());
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

        // 2.通过线程池启动线程
        ExecutorService service = Executors.newSingleThreadExecutor();

        Future<String> result = service.submit(demo);
        try {
            System.out.println(result.get());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}

class MyCallableDemo implements Callable {
    @Override
    public String call() throws Exception {
        return "MuggleLee";
    }
}

输出结果:

MuggleLee

通过例子可以看出,使用FutureTask类可以接收线程完成后返回的结果。如果使用场景是需要接收线程执行的结果(无论是成功执行的结果还是异常返回的信息),实现Callable接口结合FutureTask实现类接收返回数据是比较常见的一种做法。更为常见的做法是通过使用线程池submit()方法接收返回的结果。

线程池的实现类 ThreadPoolExecutor 提供了3个 submit() 方法支持获取线程返回的结果。

Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);

可以发现,返回值类型都是 Future 接口。那继续看下 Future 接口有哪些抽象方法。

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

通过方法名很明显可以知道各个抽象方法的作用。

实际上,submit() 方法将线程的执行结果封装成 FutureTask 对象返回。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    // 将 Callable 接口对象封装成 FutureTask 对象
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

接下来,看下创建 FutureTask 对象的源码。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

Executor类部分源码:

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

FutureTask 类的第一个构造方法将参数 Callable 对象赋值给 FutureTask 对象的 callable 属性,并设置 state 变量为 NEW(稍后再解释 callable 和 state 两个变量的作用);有意思的是第二个构造方法,将第一个参数 Runnable 对象传给 Executor 类的 callable() 方法,再调用已经实现了 Callable 接口的 RunnableAdapter 适配器类,执行 Runnable 对象的 run() 方法。(设计模式中的适配器模式,不熟悉的可以参考我另外一篇拙作:设计模式之适配器模式

emmmm...上面几个方法的确比较绕,画了一张流程图辅助理解吧。


接下来,重点剖析 FutureTask 类的源码:

FutureTask 类声明了7种状态:

    /**
     * Possible state transitions:(FutureTask状态的转换)
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    // 任务当前 FutureTask 对象的状态
    private volatile int state;
    // 新建任务(初始状态)
    private static final int NEW = 0;
    // 任务运行中
    private static final int COMPLETING = 1;
    // 任务正常完成
    private static final int NORMAL = 2;
    // 任务异常
    private static final int EXCEPTIONAL = 3;
    // 任务被取消
    private static final int CANCELLED = 4;
    // 任务中断中
    private static final int INTERRUPTING = 5;
    // 任务被中断
    private static final int INTERRUPTED = 6;

    // 提交的任务
    private Callable<V> callable;
    // 任务运行的结果
    private Object outcome;
    // 执行任务的线程
    private volatile Thread runner;
    // 等待结果的队列(单链表)
    private volatile WaitNode waiters;

除了记录 FutureTask 对象状态之外,还声明了 state、runner、waiters的内存偏移量 和等待节点 WaitNode:

    private static final sun.misc.Unsafe UNSAFE;
    // FutureTask 对象状态在内存中的偏移量
    private static final long stateOffset;
    // 执行任务对象在内存中的偏移量
    private static final long runnerOffset;
    // 等待链表在内存中的偏移量
    private static final long waitersOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            // 通过反射获取各对象在内存中的偏移量
            stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    /**
     * 单链表。存储等待线程
     */
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }

当程序执行到线程池的execute(Runnable runnable)方法的时候,由于 execute() 方法接收的参数是 FutureTask 对象,所以肯定是执行 FutureTask 类的 run() 方法。

FutureTask.run()方法剖析

    public void run() {
        // 如果当前 FutureTask 对象的状态不是 NEW 或者执行 CAS 操作赋值给 runnerOffset 失败直接跳出 run 方法
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 设置 runner 为 null ,利于 GC
            runner = null;
            int s = state;
            // 如果有其它线程在中断任务,会调用 handlePossibleCancellationInterrupt 方法处理
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    // 执行任务正常结束后调用此方法
    protected void set(V v) {
        // 执行 CAS 方法设置 FutureTask 对象的状态由 NEW -> COMPLETING -> NORMAL
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将任务执行的结果赋值给 outcome 变量
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            // 唤醒等待线程
            finishCompletion();
        }
    }
    // 执行任务异常会调用此方法
    protected void setException(Throwable t) {
        // 执行 CAS 方法设置 FutureTask 对象的状态由 NEW -> COMPLETING -> EXCEPTIONAL
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将任务执行的结果赋值给 outcome 变量
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
            // 唤醒等待线程
            finishCompletion();
        }
    }
    /**
     * 唤醒等待线程
     */
    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 自旋遍历单链表
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        // 唤醒线程
                        LockSupport.unpark(t);
                    }
                    // 获取下一个节点,直到节点为null
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        // 这是一个空方法,可以让开发中扩展使用
        done();
        callable = null;
    }
    private void handlePossibleCancellationInterrupt(int s) {
        // 双重判断,这里困扰我很久。为什么需要两次判断状态是否为 INTERRUPTING 呢?
        // 考虑的场景是:需要确保其它线程执行 cencel(true) 是在执行 run() 或者 runAndReset()的过程中
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();// 通过自旋,优先让其它线程执行,等待 cancel(true) 执行完成
    }

核心代码都加上了注释,结合源码,画了张流程图加深理解吧!

FutureTask.get()方法剖析

    /**
     * 返回执行结果
     */
    public V get() throws InterruptedException, ExecutionException {
        // 获取当前 FutureTask 对象的状态
        int s = state;
        // 如果任务的状态是新建(NEW)或者运行中()就执行 awaitDone 方法等待获取
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 返回执行结果
        return report(s);
    }

    /**
     * 与上面 get() 相似,加上 timeout 设置过期时间,超时抛出异常
     *
     * @param timeout 过期时间
     * @param unit    时间单位
     * @return 返回执行结果
     */
    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();// 被取消或者中断,直接抛出异常
        throw new ExecutionException((Throwable) x);// 运行过程中发生异常
    }

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        // 如果设置超时,计算截止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 代表当前等待结果线程的等待节点
        WaitNode q = null;
        // 记录是否把当前线程加入到了队列
        boolean queued = false;
        for (; ; ) {
            if (Thread.interrupted()) {
                removeWaiter(q);// 如果被中断了,删除当前线程节点,并抛出异常
                throw new InterruptedException();
            }
            int s = state;
            // 如果 state 状态大于 COMPLETING 则说明任务已经执行,直接返回状态值
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            } else if (s == COMPLETING) // 如果 state 状态等于 COMPLETING,说明正在设置结果,则放弃时间片轮询等待
                Thread.yield();
            else if (q == null) // 任务状态为 NEW ,构造等待节点
                q = new WaitNode();
            else if (!queued) // 状态为 NEW ,并且节点不为 null ,并且该节点没有加入到 waiter 队列中
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) { // 如果设定超时,进行超时判断
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
            // 阻塞到超时时间
                LockSupport.parkNanos(this, nanos);
            } else // 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
                LockSupport.park(this);
        }
    }    
    // 通过自旋链表删除指定节点
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            // 设置节点为空,通过自旋找出空节点并删除
            node.thread = null;
            // 自旋保证删除成功
            retry:
            for (; ; ) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)// 如果当前节点不是空,不需要删除
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q, s))
                        continue retry;
                }
                break;
            }
        }
    }

结合源码,画出了以下流程图加深理解:


FutureTask.cancel()方法剖析

    /**
     * 任务取消
     * @param mayInterruptIfRunning true:代表中断线程 false:代表取消线程
     * @return
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 如果任务不在 NEW 状态或者执行 UNSAFE 操作失败直接返回 false
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    // 设置最终的状态为中断状态
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 释放等待线程
            finishCompletion();
        }
        return true;
    }

cancel 方法参数为 mayInterruptIfRunning,当参数值为 false 的时候,直接将 FutureTask 对象的状态设置为 CANCELLED,并释放等待线程;当参数值为 ture 的时候,调用 interrupt() 方法中断线程,并设置最终状态为中断状态。

runAndReset()方法剖析

值得一提的是, FutureTask 类有个方法 runAndReset() 可以让线程重复执行。譬如使用 ScheduledThreadPoolExecutor 线程池,这个类可以使线程周期性的重复执行,具体的可以自行查看相关源码。

    /**
     * 与 run() 方法类似,但该方法可以执行多次。
     * 不同点:1.不设置返回值 2.不设置 state 值(执行完任务后,FutureTask 对象状态还是 NEW )
     */
    protected boolean runAndReset() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    // 不设置返回值
                    c.call();
                    ran = true;
                } catch (Throwable ex) {    
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        // 任务执行成功,且状态重置为NEW
        return ran && s == NEW;
    }

isCancelled()、isDone()方法源码:

    // 根据状态码判断任务是否被取消
    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    // 根据状态码判断任务是否执行完成
    public boolean isDone() {
        return state != NEW;
    }

总结

使用 FutureTask 类结合使用线程池,可以通过多线程异步计算任务,最后所有子线程执行完成之后再继续执行主线程。

参考资料:

《Java多线程编程实战指南-核心篇》

http://www.tianshouzhi.com/api/tutorials/mutithread/317

上一篇下一篇

猜你喜欢

热点阅读