ExecutorService 的 Future 执行分析

2019-03-05  本文已影响0人  JoneSnow

ExecutorService executorService = new ThreadPoolExecutor(0,
2000, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

0: corePoolSize the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
2000: the maximum number of threads to allow in the pool
60L: keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. Timeout in nanoseconds for idle threads waiting for work. Threads use this timeout when there are more than corePoolSize present or if allowCoreThreadTimeOut. Otherwise they wait forever for new work.

默认线程池里面没有任何线程,最多允许生成2000个线程,而且线程最多空闲60s

Future future = executorService.submit(() -> {Thread.sleep(1000)});
Object a = future.get()

Future 实际类型是FutureTask
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

FutureTask初始的时候state是NEW
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

state取值为
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

awaitDone等待直到异步任务执行完

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L; //明显这里会是0
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

初次进来会进入
else if (q == null)
q = new WaitNode();

创建了一个WaitNode。

第二次进来
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);

waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters"));

第三次进来
会进入
LockSupport.park(this); 线程会挂起,挂起这个主线程,这也就是为什么调用get的时候会等待

线程池调度可以看java.util.concurrent.ThreadPoolExecutor

上一篇下一篇

猜你喜欢

热点阅读