Java设计模式之异步方法
2016-01-03 本文已影响798人
唐ikigai
AsyncCallback.java
/**
* AsyncCallback interface
*/
public interface AsyncCallback<T> {
void onComplete(T value, Optional<Exception> ex);
}
AsyncExecutor.java
/**
*
* AsyncExecutor interface
*
*/
public interface AsyncExecutor {
<T> AsyncResult<T> startProcess(Callable<T> task);
<T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
}
AsyncResult.java
/**
*
* AsyncResult interface
*/
public interface AsyncResult<T> {
boolean isCompleted();
T getValue() throws ExecutionException;
void await() throws InterruptedException;
}
实现类:ThreadAsyncExecutor.java
/**
*
* Implementation of async executor that creates a new thread for every task.
*
*/
public class ThreadAsyncExecutor implements AsyncExecutor {
/** Index for thread naming */
private final AtomicInteger idx = new AtomicInteger(0);
@Override
public <T> AsyncResult<T> startProcess(Callable<T> task) {
return startProcess(task, null);
}
@Override
public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
CompletableResult<T> result = new CompletableResult<>(callback);
new Thread(() -> {
try {
result.setValue(task.call());
} catch (Exception ex) {
result.setException(ex);
}
} , "executor-" + idx.incrementAndGet()).start();
return result;
}
@Override
public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
if (asyncResult.isCompleted()) {
return asyncResult.getValue();
} else {
asyncResult.await();
return asyncResult.getValue();
}
}
/**
* Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
* exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
*
* @see java.util.concurrent.FutureTask
* @see java.util.concurrent.CompletableFuture
*/
private static class CompletableResult<T> implements AsyncResult<T> {
static final int RUNNING = 1;
static final int FAILED = 2;
static final int COMPLETED = 3;
final Object lock;
final Optional<AsyncCallback<T>> callback;
volatile int state = RUNNING;
T value;
Exception exception;
CompletableResult(AsyncCallback<T> callback) {
this.lock = new Object();
this.callback = Optional.ofNullable(callback);
}
/**
* Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
* completion.
*
* @param value
* value of the evaluated task
*/
void setValue(T value) {
this.value = value;
this.state = COMPLETED;
this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
synchronized (lock) {
lock.notifyAll();
}
}
/**
* Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
* completion.
*
* @param exception
* exception of the failed task
*/
void setException(Exception exception) {
this.exception = exception;
this.state = FAILED;
this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
synchronized (lock) {
lock.notifyAll();
}
}
@Override
public boolean isCompleted() {
return state > RUNNING;
}
@Override
public T getValue() throws ExecutionException {
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
throw new IllegalStateException("Execution not completed yet");
}
}
@Override
public void await() throws InterruptedException {
synchronized (lock) {
if (!isCompleted()) {
lock.wait();
}
}
}
}
}
App.java
public class App {
/**
* Program entry point
*/
public static void main(String[] args) throws Exception {
// construct a new executor that will run async tasks
AsyncExecutor executor = new ThreadAsyncExecutor();
// start few async tasks with varying processing times, two last with callback handlers
AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));
// emulate processing in the current thread while async tasks are running in their own threads
Thread.sleep(350); // Oh boy I'm working hard here
log("Some hard work done");
// wait for completion of the tasks
Integer result1 = executor.endProcess(asyncResult1);
String result2 = executor.endProcess(asyncResult2);
Long result3 = executor.endProcess(asyncResult3);
asyncResult4.await();
asyncResult5.await();
// log the results of the tasks, callbacks log immediately when complete
log("Result 1: " + result1);
log("Result 2: " + result2);
log("Result 3: " + result3);
}
private static <T> Callable<T> lazyval(T value, long delayMillis) {
return () -> {
Thread.sleep(delayMillis);
log("Task completed with: " + value);
return value;
};
}
private static <T> AsyncCallback<T> callback(String name) {
return (value, ex) -> {
if (ex.isPresent()) {
log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
} else {
log(name + ": " + value);
}
};
}
private static void log(String msg) {
System.out.println(String.format("[%1$-10s] - %2$s", Thread.currentThread().getName(), msg));
}
}