线程池-工作单元

2019-07-18  本文已影响0人  王侦

1.Runnable接口

The Runnable interface should be implemented by any class 
whose instances are intended to be executed by a thread. The 
class must define a method of no arguments called run.

This interface is designed to provide a common protocol for 
objects that wish to execute code while they are active. For 
example, Runnable is implemented by class Thread. Being active 
simply means that a thread has been started and has not yet 
been stopped.

In addition, Runnable provides the means for a class to be active 
while not subclassing Thread. A class that implements Runnable 
can run without subclassing Thread by instantiating a Thread 
instance and passing itself in as the target. In most cases, the 
Runnable interface should be used if you are only planning to 
override the run() method and no other Thread methods. This is 
important because classes should not be subclassed unless the 
programmer intends on modifying or enhancing the fundamental 
behavior of the class.

Runnable接口由如下类实现:该类需要被线程执行。该类需要定义一个无参方法run。

Runnable提供了一种不是继承Thread的方式。在大多数情况下,如果仅仅是覆写run方法而不是其他Thread方法,则应该使用Runnable接口。除非打断修改或增强类的基本行为,否则不应该对类进行子类化。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

2.Callable接口

A task that returns a result and may throw an exception. 
Implementors define a single method with no arguments called 
call.

The Callable interface is similar to Runnable, in that both are 
designed for classes whose instances are potentially executed by 
another thread. A Runnable, however, does not return a result 
and cannot throw a checked exception.

The Executors class contains utility methods to convert from 
other common forms to Callable classes.

返回结果或抛出异常的任务,实现者定义一个无参call方法。

与Runnable接口类似,该类实例可能由另一个线程执行。但是Runnable不能返回结果或抛出受查异常。

Executors类包含将其他常用形式转换为Callable类的实用方法。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

3.Future

A Future represents the result of an asynchronous computation.
 Methods are provided to check if the computation is complete, to 
wait for its completion, and to retrieve the result of the 
computation. The result can only be retrieved using method get 
when the computation has completed, blocking if necessary until 
it is ready. Cancellation is performed by the cancel method. 
Additional methods are provided to determine if the task 
completed normally or was cancelled. Once a computation has 
completed, the computation cannot be cancelled. If you would 
like to use a Future for the sake of cancellability but not provide a 
usable result, you can declare types of the form Future<?> and 
return null as a result of the underlying task.

Future表示异步计算的结果。提供检查计算是否完成、等待计算完成以及取出计算结果的方法。只有在计算完成时,get方法才能取到结果,必要时会阻塞直到其计算完成。cacel方法执行取消动作。提供了其他方法检测任务是正常完成还是被取消。当计算完成后,其不能被取消。如果为了取消而不是提供有用结果而使用Future,可以声明Future<?>形式的类型,并返回null。

示例用法(如下的类都是虚构的):

 interface ArchiveSearcher { String search(String target); }
 class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)
       throws InterruptedException {
     Future<String> future
       = executor.submit(new Callable<String>() {
         public String call() {
             return searcher.search(target);
         }});
     displayOtherThings(); // do other things while searching
     try {
       displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
 }

FutureTask类是Future的一个实现,并实现了Runnable,因此可以由Executor执行。上述带submit结构可由下面的形式代替:

 FutureTask<String> future =
   new FutureTask<String>(new Callable<String>() {
     public String call() {
       return searcher.search(target);
   }});
 executor.execute(future);

如下为Future的完整形式:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

关于cancle方法的说明:

4.RunnableFuture

A Future that is Runnable. Successful execution of the run 
method causes completion of the Future and allows access to its 
results.

是Runnable的Future,成功执行run方法将使Future完成,并允许访问其他结果。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

5.FutureTask

A cancellable asynchronous computation. This class provides a 
base implementation of Future, with methods to start and cancel 
a computation, query to see if the computation is complete, and 
retrieve the result of the computation. The result can only be 
retrieved when the computation has completed; the get methods 
will block if the computation has not yet completed. Once the 
computation has completed, the computation cannot be restarted 
or cancelled (unless the computation is invoked using 
runAndReset()).

A FutureTask can be used to wrap a Callable or Runnable object. 
Because FutureTask implements Runnable, a FutureTask can be 
submitted to an Executor for execution.

In addition to serving as a standalone class, this class provides 
protected functionality that may be useful when creating 
customized task classes.

可取消的异步计算。此类提供了Future的基本实现,包括启动和取消计算的方法,查询计算是否完成的查询,以及检索计算结果。 只有在计算完成后才能检索结果; 如果计算尚未完成,get方法将阻塞。 计算完成后,无法重新启动或取消计算(除非使用runAndReset()调用计算)。

FutureTask可用于包装Callable或Runnable对象。 因为FutureTask实现了Runnable,所以可以将FutureTask提交给Executor执行。

除了作为独立类之外,此类还提供了在创建自定义任务类时可能有用的保护方法。

Revision notes: This differs from previous versions of this
class that relied on AbstractQueuedSynchronizer, mainly to
avoid surprising users about retaining interrupt status during
cancellation races. Sync control in the current design relies
on a "state" field updated via CAS to track completion, along
with a simple Treiber stack to hold waiting threads.

Style note: As usual, we bypass overhead of using
AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.

修订说明:之前版本依赖于AQS,现版本主要是为了避免在取消争用期间保留中断状态。当前设计中的同步控制依赖于通过CAS更新state域来跟踪完成,以及用于保存等待线程简单的Treiber栈。

样式说明:像之前一样,绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafe内部方法。

5.1 状态的变化

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

任务的运行状态初始化为NEW。任务状态转为终止状态只会发生在方法set setException和cancel方法。在完成期间,state可能会一个短暂的COMPLETING或者INTERRUPTING。从这些中间状态到最终状态的转换使用更便宜的ordered/惰性写入,因为值是唯一的并且不能进一步修改。

可能的状态转换:

5.2 构造器及域

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

如果不需要结果,考虑使用下面的形式:
Future<?> f = new FutureTask<Void>(runnable, null);

从构造器可以看出,任务的初始状态为NEW。

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

5.3 run

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

runner作为锁使用,防止其他线程并发调用run:

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

从上面可以看出,run有两条状态转移路线:

关于finishCompletion:

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

5.4 get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

5.5 cancel

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

6.为什么FutureTask不再基于AQS

ThreadPoolExecutor executor = ...;  
executor.submit(task1).cancel(true);  
executor.submit(task2);  

虽然中断的是task1,但可能task2得到中断信号。

JDK1.6的 FutureTask.Sync.innerCancel的代码:

boolean innerCancel(boolean mayInterruptIfRunning) {  
 for (;;) {  
  int s = getState();  
  if (ranOrCancelled(s))  
      return false;  
  if (compareAndSetState(s, CANCELLED))  
      break;  
 }  
    if (mayInterruptIfRunning) {  
        Thread r = runner;  
        if (r != null)  //1
            r.interrupt(); //2
    }  
    releaseShared(0);  
    done();  
    return true;  
}  

按照如下的执行流程,task2得到中断信号:

看看新版本怎么处理这个中断遗留问题:
cancel代码:

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

run最后的代码:

        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    /**
     * Ensures that any interrupt from a possible cancel(true) is only
     * delivered to a task while in run or runAndReset.
     */
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

可知run最后对此进行了特别处理:

这里根本就是在主线程执行取消指定任务时,让执行该取消任务的线程自旋等待 主线程中断操作 完成。

参考

上一篇 下一篇

猜你喜欢

热点阅读