java多线程

多线程-源码解析RunnableFuture

2021-05-13  本文已影响0人  余生爱静
/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.

译文:可运行的Future。 成功执行run方法会导致Future的完成,并允许访问其结果。

从源码可知,RunnableFuture继承了Future和Runnable,所以其就具有了可以运行在线程中,能够取消并且能够异步获取到运行结果。接下来学习一下它的一个具体实现类FutureTask

概论

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).
 *
 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
 * {@link Runnable} object.  Because {@code FutureTask} implements
 * {@code Runnable}, a {@code FutureTask} can be submitted to an
 * {@link Executor} for execution.
 *
 * <p>In addition to serving as a standalone class, this class provides
 * {@code protected} functionality that may be useful when creating
 * customized task classes.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V>
* A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).

译文:可取消的异步计算。 此类提供Future的基本实现,其中包含启动和取消计算,查询计算是否完成以及返回计算结果的方法。 只有在计算完成后才能返回结果; 如果计算尚未完成,则get方法将阻塞。 一旦计算完成,除非使用runAndReset调用计算,否则无法重新启动或取消计算。

FutureTask主要成员

public class FutureTask<V> implements RunnableFuture<V> {
     /*
     * FutureTask中定义了一个state变量,用于记录任务执行的相关状态 ,状态的变化过程如下
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    //主流程状态
    private static final int NEW = 0; //当FutureTask实例刚刚创建到callbale的call方法执行完成前,处于此状态
    private static final int COMPLETING  = 1; //callable的call方法执行完成或出现异常时,首先进行此状态
    private static final int NORMAL    = 2;//callable的call方法正常结束时,进入此状态,将outcom设置为正常结果
    private static final int EXCEPTIONAL = 3;//callable的call方法异常结束时,进入此状态,将outcome设置为抛出的异常
    //取消任务执行时可能处于的状态
    private static final int CANCELLED= 4;// FutureTask任务尚未执行,即还在任务队列的时候,调用了cancel方法,进入此状态
    private static final int INTERRUPTING = 5;// FutureTask的run方法已经在执行,收到中断信号,进入此状态
    private static final int INTERRUPTED  = 6;// 任务成功中断后,进入此状态
    
    private Callable<V> callable;//需要执行的任务,提示:如果提交的是Runnable对象,会先转换为Callable对象,这是构造方法参数
    private Object outcome; //任务运行的结果
    private volatile Thread runner;//执行此任务的线程
  
    //等待该FutureTask的线程链表,对于同一个FutureTask,如果多个线程调用了get方法,对应的线程都会加入到waiters链表中,同时当FutureTask执行完成后,也会告知所有waiters中的线程
    private volatile WaitNode waiters;
    ......
}

FutureTask的成员变量并不复杂,主要记录以下几部分信息:
1、状态
2、任务(callable)
3、结果(outcome)
4、等待线程(waiters)

构造方法

Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。


FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

FutureTask的方法

run()方法

FutureTask执行任务的方法当然还是run方法:

public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //设置结果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
               //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
                handlePossibleCancellationInterrupt(s);
        }
    }

run方法的大概逻辑如下:
1、如果状态不为new或者运行线程runner失败,说明当前任务已经被其他线程启动或者已经被执行过,直接返回false
2、调用call方法执行核心任务逻辑。如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程
3、如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING

set()方法

除非已经设置或取消了该Future,否则将此Future的结果设置为给定值

/**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

finishCompletion()

删除并发送所有等待线程的信号,调用done(),并使callable无效。

 /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
             ////通过CAS把栈顶的元素置为null,相当于弹出栈顶元素
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                          //唤醒等待的线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

finishCompletion的逻辑也比较简单:
1、遍历waiters链表,取出每一个节点:每个节点都代表一个正在等待该FutureTask结果(即调用过get方法)的线程
2、通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成

get()方法

get方法很简单,主要就是调用awaitDone方法:

/**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

awaitDone()

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion or at timeout
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) { //如果state状态大于COMPLETING 则说明任务执行完成,或取消
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)//如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) { //如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) { //构建节点
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)//把当前节点入栈
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
             //如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
            //如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);//阻塞当前线程nanos秒
            }
            else
                LockSupport.park(this);//阻塞当前线程
        }
    }

整个方法的大致逻辑主要分为以下几步:
1>如果当前状态值大于COMPLETING,说明已经执行完成或者取消,直接返回
2>如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快
3>如果当前线程是首次进入循环,为当前线程创建wait节点加入到waiters链表中
4>根据是否定时将当前线程挂起(LockSupport.parkNanos LockSupport.park)来阻塞当前线程,直到超时或者线程被finishCompletion方法唤醒
5>当线程挂起超时或者被唤醒后,重新循环执行上述逻辑

cancel()

 public boolean cancel(boolean mayInterruptIfRunning) {
        //根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            //唤醒所有在get()方法等待的线程
            finishCompletion();
        }
        return true;
    }
public class FutureTaskDemo {
    public static void main(String[] args) {
        FutureTask<String> futureTask=new FutureTask<>(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(20*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"Hello world");
        Thread thread = new Thread(futureTask,"Thread Future");
        thread.start();
        try {
            String result=futureTask.get();
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

运行时线程堆栈信息

D:\android\progect\IdeaDemo>jps
10624 Launcher
1296 FutureTaskDemo
5360 Jps
12872

D:\android\progect\IdeaDemo>jstack 1296
2021-05-13 00:03:51
Full thread dump Java HotSpot(TM) Client VM (25.202-b08 mixed mode, sharing):

"Thread Future" #9 prio=5 os_prio=0 tid=0x15a1d800 nid=0x2040 waiting on condition [0x15cef000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at com.idea.future.FutureTaskDemo$1.run(FutureTaskDemo.java:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
"main" #1 prio=5 os_prio=0 tid=0x02a0dc00 nid=0x18a8 waiting on condition [0x00d4f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x04cfd7e8> (a java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
        at java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at com.idea.future.FutureTaskDemo.main(FutureTaskDemo.java:22)

由于我在run方法里面设置了相当于耗时的操作,从堆栈信息可以得知,"Thread Future"的状态是TIMED_WAITING,而主线程"main"则处于WAITING的状态

实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行

又因为实现了 Future 接口,所以也能用来获得任务的执行结果

提交给 ThreadPoolExecutor 去执行

下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行


// 创建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交FutureTask 
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();

直接被 Thread 执行

FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。


// 创建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

总结

利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。

利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。

上一篇下一篇

猜你喜欢

热点阅读