并发编程实战二之线程池和CompletionService
2016-11-25 本文已影响0人
谜碌小孩
线程池
线程饥饿死锁
任务依赖于其他任务,线程池不够大
单线程,一个任务将另一个任务提交到同一个Executor。
设置线程池的大小
int N_CPUS = Runtime.getRuntime().availableProcessors();
计算密集型 thread = N_CPUS+1
包含I/O或其他阻塞操作的任务 thread = N_CPUS*U_CPUS(1 + W/C)
U_CPUS —— 基准负载
W/C —— 等待时间与计算时间的比值
内存、文件句柄、套接字句柄、数据库连接 —— 资源可用总量/每个任务的需求量
线程池的创建
public ThreadPoolExecutor(int corePoolSize, //基本大小
int maximumPoolSize, //最大
long keepAliveTime, //线程存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //线程队列
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Executors.newSingleThreadExecutor();
Executors.newFixedThreadPool(100)基本大小和最大大小设置为参数指定值
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newCachedThreadPool()线程池最大大小设置为Integer.MAX_VALUE,队列为SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
串行递归转并行递归
public class SeToParallel {
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
for(Node<T> n:nodes){
results.add(n.compute());
sequentialRecursive(n.getChildren(),results);
}
}
public <T> void parallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
for(final Node<T> n : nodes){
exec.execute(new Runnable() {
@Override
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec,n.getChildren(),results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<>();
parallelRecursive(exec,nodes,resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
class Node<T>{
private T t;
private List<Node<T>> children;
public List<Node<T>> getChildren() {
return children;
}
public T compute(){
return t;
}
}
}
CompletionService:Executor与BlockingQueue
ExecutorCompletionService实现了CompletionService,用BlockingQueue保存计算完成的结果。提交任务是,任务被包装成QueueingFuture.
页面逐渐渲染:
public class Renderer {
private final ExecutorService executorService;
public Renderer(ExecutorService executorService) {
this.executorService = executorService;
}
void renderPage(CharSequence source){
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
for(final ImageInfo imageInfo:info){
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}
renderText(source);
try {
for(int t = 0,n = info.size();t < n;t++){
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}