Java 杂谈java

java Future,FutureTask,Callable

2019-01-14  本文已影响30人  Cc_7397

Future

Future的作用就是在主线程之外另外起一个线程去做另一件事,让java同时可以并行处理。并且提供方法随时去查看Future的是否完成,并取得任务的结果。本篇文章会详细讲解流程和每个方法。

先写一个小例子:
这个例子中要做3件事,其中job2和job3各耗时2000毫秒,而Future可以让他们同时进行

  long a = System.currentTimeMillis();
  System.out.println("do job1");
  Callable<String> callable = () -> { //子线程任务,实现Callable<T> 接口,返回值为T
        Thread.sleep(2000);
        System.out.println("do job2");
        return "job2 is done";
   };
  FutureTask<String> task = new FutureTask<>(callable); //new 一个FutureTask 将任务引用传递进去
  new Thread(task).start();  //启动Future任务
  Thread.sleep(2000);  
  System.out.println("do job3");
  while (!task.isDone()) {  //循环判断Future任务是否完成了
  Thread.onSpinWait(); //相当于Thread.sleep(); 不过效率更高
  }
  System.out.println("job2结果 => " + task.get() + "时间:" + (System.currentTimeMillis() - a));

结果

do job1
do job2
do job3
job2结果 => job2 is done时间:2006

可以看到Future确实让任务并行处理了。
我们这里使用了isDone()方法查询计算是否完成了,下面细节讲解运行原理和它提供的方法。

Future提供的方法

FutureTask实现了Future<V>接口,该接口提供了以下方法:

运行原理和流程

1.首先先看下FutureTask 有哪些属性呢

    private volatile int state; //存储状态的变量 使用volatile
    private static final int NEW          = 0; //代表是刚刚创建没进行如何操作
    private static final int COMPLETING   = 1; //正在计算中
    private static final int NORMAL       = 2; //正常
    private static final int EXCEPTIONAL  = 3; //发生了异常
    private static final int CANCELLED    = 4; //被取消了
    private static final int INTERRUPTING = 5; //中断中
    private static final int INTERRUPTED  = 6; //以被中断
 /** The underlying callable; nulled out after running */
    private Callable<V> callable; //我们提交的任务
    /** The result to return or exception to throw from get() */
    private Object outcome; //存储结果 或者异常
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner; //记录运行的线程
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters; //等待结果的队列

2.下面讲解运行原理和流程
首先我们创建了一个Callable 记录我们任务的方法,然后new FutureTask 会把Callable赋值给callable属性,将state 设置为NEW也就是0。然后启动线程调用run方法

   public void run() {
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果状态不是new或者将runner属性赋值为当前线程时失败就直接返回,这里表明了我们的任务只能启动一次。
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) { // callable 不为空,状态为new才继续计算
                V result;
                boolean ran;//flag记录是否计算成功
                try {
                    result = c.call();//调用我们的方法
                    ran = true;//完成了 把flag设置为ture
                } catch (Throwable ex) {  //有异常 结果设置为空 flag设为false
                    result = null;
                    ran = false;
                    setException(ex);  //处理异常的方法
                }
                if (ran)  //计算成功
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;  
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING) //如果有其它线程在中断任务会等到中断结束
                handlePossibleCancellationInterrupt(s);  
        }
    }

先讲解下CAS 这里面好多CAS操作

RUNNER.compareAndSet(this, null, Thread.currentThread())这里VarHandle 来提供CAS操作,我这里是jdk11,你们的版本可能是用unsafe来操作的。他们的作用是一样的,就是先获取某个属性相对对象的内存地址偏移值,然后用CAS方式修改属性的值,因为一个对象属性在内存中的排列是固定的,这样只要有对象的地址和属性的偏移值就能定位属性在内存的地址。这里的RUNNER 是这样来的,可以看到定位了FutureTask这个类的runner属性,类型是Thread

 MethodHandles.Lookup l = MethodHandles.lookup();
 RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);

RUNNER.compareAndSet(A,B,C)方法:修改A对象的RUNNER所代表的属性,如果当前值是B就修改为C,如果失败会一直重试直到,当前属性不是B了或者修改成功。
使用unsafe也是一样的,只不过不会去先取得VarHandle这个句柄,而是在调用时将属性偏移值一起传递进去。

if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread())) 

所以意思计算如果状态不是new或者将runner属性赋值为当前线程时失败

继续

发生异常

后面就是调用我们自己写的方法了,如果发送了异常我们看下

 protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = t;
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();//唤起所有等待线程
        }
    }

就是先发状态设置为计算中,然后记录下异常,在把状态设置为EXCEPTIONAL异常,注意setRelease方法表示设置值后这个值不能再被修改。

计算完成

   protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = v;
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }

其实和发送异常是一样的,只是记录状态不一样
都会调finishCompletion()方法

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) { //方法内变量  记录链表 
            if (WAITERS.weakCompareAndSet(this, q, null)) {//将属性的设置为空
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();//空方法 ,给用户扩展用

        callable = null;        // to reduce footprint
    }

这里的WaitNode 是一个链表,记录使用等待结果的线程。上面说的get方法会堵塞直到计算完成,被堵塞的线程就会存储到这个链表。

        for (WaitNode q; (q = waiters) != null;) { //方法内变量  记录链表 
            if (WAITERS.weakCompareAndSet(this, q, null)) {//将属性的设置为空
        //这里可能会有新的队列

这里会一直循环,防止将waiters属性赋值为空之后又有新的线程加入到队列中
然后循环链表把线程唤起使用 LockSupport.unpark(t),这个方法点进去可以发现其实用的是前面提到的Unsafe,juc的包很多都是用Unsafe实现的。

下面讲解各个方法

isDone

 public boolean isDone() {
        return state != NEW;
    }

非常简单。。。

get

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

两个重载方法,一个传入了最大等待时间
如果状态的<= COMPLETING 也就是NEW 和COMPLETING 就进入等待的方法

 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;  //记录当前线程 
        boolean queued = false;  //记录是否把当前线程加入到了队列
        for (;;) {
            int s = state;
            if (s > COMPLETING) { //大于计算中,代表完成||中断||取消 直接返回 
                if (q != null)
                    q.thread = null; //当前线程设置为空,finishCompletion() 会判断 链表队列里的线程是否为空 
                return s;
            }
            else if (s == COMPLETING) //如果在计算中 放开cpu资源 
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) { //如果当前线程被中断 
                removeWaiter(q); //从等待队列中移除当前线程对象
                throw new InterruptedException(); 
            }
            else if (q == null) {  //这里只有第一次循环会进入
                if (timed && nanos <= 0L) //如果是等待一段时间,并且时间小于等于0 直接退出了 
                    return s;
                q = new WaitNode(); //new 一个链表元素  这里new的时候会把当前线程对象放进这个元素里面 
            }
            else if (!queued) //如果没加入 FutureTask的链表里就加入 
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); //把waiters 属性设置成q的下一个 替换 waiters 属性为q
            else if (timed) { //设置了等待时间才会进入
                final long parkNanos; //记录时间
                if (startTime == 0L) { // first time  第一次进入记录开始时间
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else { //第二次进入
                    long elapsed = System.nanoTime() - startTime; //加上 刚才使用的时间
                    if (elapsed >= nanos) {//如果已经超过等待时间直接退出了
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed; //计算还要等待的时间
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)//重新check 下状态
                    LockSupport.parkNanos(this, parkNanos); //挂起线程剩下的时间  这里是毫秒
            }
            else
                LockSupport.park(this); // 不设置时间,挂起线程 
        }
    }

梳理下流程,
1.先判断状态
2.把线程加入等待队列

  1. 有等待时间就 park 那么多时间
  2. 没等待时间就直接 park
  3. 其它线程唤醒 重新判断状态
    这个方法会返回这个状态,计算等待结束后的状态。可能是完成,异常等。然后会掉report方法
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

正常就返回结果,否则会抛个异常出去。使用我们自己写的方法出现了异常最终会在这里抛出来。

isCancelled

  public boolean isCancelled() {
        return state >= CANCELLED;
    }

也很简单了

cancel

要注意一点,java是不能直接强制停掉线程的。这里取消是使用中断的方式 interrupt ,然后把状态修改为INTERRUPTING或者CANCELLED
中断一个线程并不会停掉线程,只是告诉线程你要停掉了,被中断的线程可以使用Thread.interrupted() 来查看我是不是被中断了,另外线程在堵塞时如果被中断会从堵塞中恢复抛出一个中断异常InterruptedException。

   public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && STATE.compareAndSet
              (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    STATE.setRelease(this, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();//唤醒等待队列
        }
        return true;
    }

取消只会在状态为NEW时才有效,但是在任务计算完之前 状态都是NEW
run方法的状态判断

 if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果状态不是new或者将runner属性赋值为当前线程时失败就直接返回,这里表明了我们的任务只能启动一次。
            return;

所以你提交的任务并不会在计算时停掉,如果在计算的run方法的状态判断前那么不会计算。
如果在判断后,那么计算还是会进行,但结果不会存储,并且在get时会抛这个异常。

     if (s >= CANCELLED)
            throw new CancellationException();

这个方法的参数mayInterruptIfRunning说一下。

最后

其实里面的方法不是很难,重点是在并发时的处理和设计,比如get的同时取消,多个线程同时get等,多个方法通过共享变量来通信和协调。这个jdk的类,所以可以当作一个标准优秀的案例来参考和学习。这是作者第一次写文章,如果有不对的地方大家一定要提出来。

上一篇下一篇

猜你喜欢

热点阅读