java Future,FutureTask,Callable
Future
Future的作用就是在主线程之外另外起一个线程去做另一件事,让java同时可以并行处理。并且提供方法随时去查看Future的是否完成,并取得任务的结果。本篇文章会详细讲解流程和每个方法。
先写一个小例子:
这个例子中要做3件事,其中job2和job3各耗时2000毫秒,而Future可以让他们同时进行
long a = System.currentTimeMillis();
System.out.println("do job1");
Callable<String> callable = () -> { //子线程任务,实现Callable<T> 接口,返回值为T
Thread.sleep(2000);
System.out.println("do job2");
return "job2 is done";
};
FutureTask<String> task = new FutureTask<>(callable); //new 一个FutureTask 将任务引用传递进去
new Thread(task).start(); //启动Future任务
Thread.sleep(2000);
System.out.println("do job3");
while (!task.isDone()) { //循环判断Future任务是否完成了
Thread.onSpinWait(); //相当于Thread.sleep(); 不过效率更高
}
System.out.println("job2结果 => " + task.get() + "时间:" + (System.currentTimeMillis() - a));
结果
do job1
do job2
do job3
job2结果 => job2 is done时间:2006
可以看到Future确实让任务并行处理了。
我们这里使用了isDone()方法查询计算是否完成了,下面细节讲解运行原理和它提供的方法。
Future提供的方法
FutureTask实现了Future<V>接口,该接口提供了以下方法:
- boolean cancel(boolean mayInterruptIfRunning);//取消计算
- boolean isCancelled(); //判断计算是否被取消。
- boolean isDone(); //判断计算是否结束。
- V get();//获取结果,如果计算没结束会一直堵塞到结束。
- V get(long timeout, TimeUnit unit) ;//获取结果,如果计算没结束最多等待一定时间。
运行原理和流程
1.首先先看下FutureTask 有哪些属性呢
- 运行状态的判断:
private volatile int state; //存储状态的变量 使用volatile
private static final int NEW = 0; //代表是刚刚创建没进行如何操作
private static final int COMPLETING = 1; //正在计算中
private static final int NORMAL = 2; //正常
private static final int EXCEPTIONAL = 3; //发生了异常
private static final int CANCELLED = 4; //被取消了
private static final int INTERRUPTING = 5; //中断中
private static final int INTERRUPTED = 6; //以被中断
- 其他属性
/** The underlying callable; nulled out after running */
private Callable<V> callable; //我们提交的任务
/** The result to return or exception to throw from get() */
private Object outcome; //存储结果 或者异常
/** The thread running the callable; CASed during run() */
private volatile Thread runner; //记录运行的线程
/** Treiber stack of waiting threads */
private volatile WaitNode waiters; //等待结果的队列
2.下面讲解运行原理和流程
首先我们创建了一个Callable 记录我们任务的方法,然后new FutureTask 会把Callable赋值给callable属性,将state 设置为NEW也就是0。然后启动线程调用run方法
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果状态不是new或者将runner属性赋值为当前线程时失败就直接返回,这里表明了我们的任务只能启动一次。
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) { // callable 不为空,状态为new才继续计算
V result;
boolean ran;//flag记录是否计算成功
try {
result = c.call();//调用我们的方法
ran = true;//完成了 把flag设置为ture
} catch (Throwable ex) { //有异常 结果设置为空 flag设为false
result = null;
ran = false;
setException(ex); //处理异常的方法
}
if (ran) //计算成功
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING) //如果有其它线程在中断任务会等到中断结束
handlePossibleCancellationInterrupt(s);
}
}
先讲解下CAS 这里面好多CAS操作
RUNNER.compareAndSet(this, null, Thread.currentThread())这里VarHandle 来提供CAS操作,我这里是jdk11,你们的版本可能是用unsafe来操作的。他们的作用是一样的,就是先获取某个属性相对对象的内存地址偏移值,然后用CAS方式修改属性的值,因为一个对象属性在内存中的排列是固定的,这样只要有对象的地址和属性的偏移值就能定位属性在内存的地址。这里的RUNNER 是这样来的,可以看到定位了FutureTask这个类的runner属性,类型是Thread
MethodHandles.Lookup l = MethodHandles.lookup();
RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
RUNNER.compareAndSet(A,B,C)方法:修改A对象的RUNNER所代表的属性,如果当前值是B就修改为C,如果失败会一直重试直到,当前属性不是B了或者修改成功。
使用unsafe也是一样的,只不过不会去先取得VarHandle这个句柄,而是在调用时将属性偏移值一起传递进去。
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
所以意思计算如果状态不是new或者将runner属性赋值为当前线程时失败
继续
发生异常
后面就是调用我们自己写的方法了,如果发送了异常我们看下
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t;
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();//唤起所有等待线程
}
}
就是先发状态设置为计算中,然后记录下异常,在把状态设置为EXCEPTIONAL异常,注意setRelease方法表示设置值后这个值不能再被修改。
计算完成
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
其实和发送异常是一样的,只是记录状态不一样
都会调finishCompletion()方法
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) { //方法内变量 记录链表
if (WAITERS.weakCompareAndSet(this, q, null)) {//将属性的设置为空
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();//空方法 ,给用户扩展用
callable = null; // to reduce footprint
}
这里的WaitNode 是一个链表,记录使用等待结果的线程。上面说的get方法会堵塞直到计算完成,被堵塞的线程就会存储到这个链表。
for (WaitNode q; (q = waiters) != null;) { //方法内变量 记录链表
if (WAITERS.weakCompareAndSet(this, q, null)) {//将属性的设置为空
//这里可能会有新的队列
这里会一直循环,防止将waiters属性赋值为空之后又有新的线程加入到队列中
然后循环链表把线程唤起使用 LockSupport.unpark(t),这个方法点进去可以发现其实用的是前面提到的Unsafe,juc的包很多都是用Unsafe实现的。
下面讲解各个方法
isDone
public boolean isDone() {
return state != NEW;
}
非常简单。。。
get
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
两个重载方法,一个传入了最大等待时间
如果状态的<= COMPLETING 也就是NEW 和COMPLETING 就进入等待的方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null; //记录当前线程
boolean queued = false; //记录是否把当前线程加入到了队列
for (;;) {
int s = state;
if (s > COMPLETING) { //大于计算中,代表完成||中断||取消 直接返回
if (q != null)
q.thread = null; //当前线程设置为空,finishCompletion() 会判断 链表队列里的线程是否为空
return s;
}
else if (s == COMPLETING) //如果在计算中 放开cpu资源
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) { //如果当前线程被中断
removeWaiter(q); //从等待队列中移除当前线程对象
throw new InterruptedException();
}
else if (q == null) { //这里只有第一次循环会进入
if (timed && nanos <= 0L) //如果是等待一段时间,并且时间小于等于0 直接退出了
return s;
q = new WaitNode(); //new 一个链表元素 这里new的时候会把当前线程对象放进这个元素里面
}
else if (!queued) //如果没加入 FutureTask的链表里就加入
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); //把waiters 属性设置成q的下一个 替换 waiters 属性为q
else if (timed) { //设置了等待时间才会进入
final long parkNanos; //记录时间
if (startTime == 0L) { // first time 第一次进入记录开始时间
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else { //第二次进入
long elapsed = System.nanoTime() - startTime; //加上 刚才使用的时间
if (elapsed >= nanos) {//如果已经超过等待时间直接退出了
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed; //计算还要等待的时间
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)//重新check 下状态
LockSupport.parkNanos(this, parkNanos); //挂起线程剩下的时间 这里是毫秒
}
else
LockSupport.park(this); // 不设置时间,挂起线程
}
}
梳理下流程,
1.先判断状态
2.把线程加入等待队列
- 有等待时间就 park 那么多时间
- 没等待时间就直接 park
- 其它线程唤醒 重新判断状态
这个方法会返回这个状态,计算等待结束后的状态。可能是完成,异常等。然后会掉report方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
正常就返回结果,否则会抛个异常出去。使用我们自己写的方法出现了异常最终会在这里抛出来。
isCancelled
public boolean isCancelled() {
return state >= CANCELLED;
}
也很简单了
cancel
要注意一点,java是不能直接强制停掉线程的。这里取消是使用中断的方式 interrupt ,然后把状态修改为INTERRUPTING或者CANCELLED
中断一个线程并不会停掉线程,只是告诉线程你要停掉了,被中断的线程可以使用Thread.interrupted() 来查看我是不是被中断了,另外线程在堵塞时如果被中断会从堵塞中恢复抛出一个中断异常InterruptedException。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();//唤醒等待队列
}
return true;
}
取消只会在状态为NEW时才有效,但是在任务计算完之前 状态都是NEW
run方法的状态判断
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果状态不是new或者将runner属性赋值为当前线程时失败就直接返回,这里表明了我们的任务只能启动一次。
return;
所以你提交的任务并不会在计算时停掉,如果在计算的run方法的状态判断前那么不会计算。
如果在判断后,那么计算还是会进行,但结果不会存储,并且在get时会抛这个异常。
if (s >= CANCELLED)
throw new CancellationException();
这个方法的参数mayInterruptIfRunning说一下。
- ture 代表会中断线程,状态会改为INTERRUPTING 并且中断interrupt() 中断后会把状态改成INTERRUPTED
- flase 则只是会 把状态修改为CANCELLED
最后
其实里面的方法不是很难,重点是在并发时的处理和设计,比如get的同时取消,多个线程同时get等,多个方法通过共享变量来通信和协调。这个jdk的类,所以可以当作一个标准优秀的案例来参考和学习。这是作者第一次写文章,如果有不对的地方大家一定要提出来。