程序员首页投稿(暂停使用,暂停投稿)

并发编程实战二之线程池和CompletionService

2016-11-25  本文已影响0人  谜碌小孩

线程池

线程饥饿死锁

任务依赖于其他任务,线程池不够大
单线程,一个任务将另一个任务提交到同一个Executor。

设置线程池的大小

int N_CPUS = Runtime.getRuntime().availableProcessors();
计算密集型  thread = N_CPUS+1
包含I/O或其他阻塞操作的任务  thread = N_CPUS*U_CPUS(1 + W/C)
    U_CPUS  ——  基准负载
    W/C  ——  等待时间与计算时间的比值
内存、文件句柄、套接字句柄、数据库连接 —— 资源可用总量/每个任务的需求量

线程池的创建

public ThreadPoolExecutor(int corePoolSize,          //基本大小
                              int maximumPoolSize,        //最大
                              long keepAliveTime,        //线程存活时间
                              TimeUnit unit,            
                              BlockingQueue<Runnable> workQueue,    //线程队列
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Executors.newSingleThreadExecutor();

Executors.newFixedThreadPool(100)基本大小和最大大小设置为参数指定值
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Executors.newCachedThreadPool()线程池最大大小设置为Integer.MAX_VALUE,队列为SynchronousQueue
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

串行递归转并行递归

public class SeToParallel {
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
        for(Node<T> n:nodes){
            results.add(n.compute());
            sequentialRecursive(n.getChildren(),results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
        for(final Node<T> n : nodes){
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec,n.getChildren(),results);
        }
    }

    public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<>();
        parallelRecursive(exec,nodes,resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

        class Node<T>{

            private T t;
            private List<Node<T>> children;

            public List<Node<T>> getChildren() {
                return children;
            }

            public T compute(){
                return t;
            }
        }
}

CompletionService:Executor与BlockingQueue

ExecutorCompletionService实现了CompletionService,用BlockingQueue保存计算完成的结果。提交任务是,任务被包装成QueueingFuture.
页面逐渐渲染:

public class Renderer {
    private final ExecutorService executorService;

    public Renderer(ExecutorService executorService) {
        this.executorService = executorService;
    }
    
    void renderPage(CharSequence source){
        List<ImageInfo> info = scanForImageInfo(source);
            CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
        for(final ImageInfo imageInfo:info){
            completionService.submit(new Callable<ImageData>() {
                @Override
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }
        
        renderText(source);
        try {
            for(int t = 0,n = info.size();t < n;t++){
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读