8. 线程池的使用

2017-05-06  本文已影响49人  大海孤了岛

8.1 在任务与执行策略之间的隐形耦合

只有当任务都是同种类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他的任务,那么除非线程池很大,否则将可能造成死锁。

8.1.1 线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么将可能产生死锁。

如下:RenderPageTask想Executor提交了两个任务来获取网页的页眉和页脚

public class ThreadDeadLock {
    ExecutorService exec = ExecutorService.newSingleThreadExecutor();

    public class RendererPageTask implements Callable<String> {
        public String call () throws Exception {
            Futrue<String> header,footer;
            header = exec.submit (new LoadFileTask("header.html"));
            footer = exec.submit (new LoadFileTask("footer.html"));
            String page = renderBody ();
            //将发生死锁 --- 由于任务在等待子任务的结果
            return header.get() + footer.get();
        }
    } 
}
8.1.2 运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。为此,我们可以通过限定任务等待资源的时间(如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等),而不要无限制地等待。如果等待时间超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。

8.2 设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。但一定要注意避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空间的处理器无法执行工作,从而降低吞吐量。

8.3 配置ThreadPoolExecutor

ThreadPoolExecutor构造函数.png
8.3.1 管理队列任务

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方式有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。

建议:只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。

8.3.2 饱和策略

当有界队列填满时,饱和策略就会开始发回作用。常见的饱和策略如下:

相关Demo

使用信号量来限制任务的到达率:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;
    
    public BoundedExecutor(Executor exec, Semaphore semaphore){
        this.exec = exec;
        this.semaphore = semaphore;
    }
    
    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try{
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e){
            semaphore.release();
        }
    }
}
8.3.4 线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含任何含特殊的配置信息。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}
public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName){
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyAppThread(r,poolName);
    }
}
public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifeCycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) { this(r,DEFAULT_NAME); }

    public MyAppThread(Runnable r, String name){
        super(r,name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }

    public void run(){
        boolean debug = debugLifeCycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try{
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() { return created.get(); }
    public static int getThreadsAlive() { return  alive.get(); }
    public static boolean getDebug() { return debugLifeCycle; }
    public static void setDebug(boolean b) { debugLifeCycle = b; }
}
8.3.5 在调用构造函数后再定制ThreadPoolExecutor

在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置(setter)来修改大多数传递给它的构造函数的参数。当如果Executor时通过工厂方法创建的,那么结果的类型转换为ThreadPoolExecutor来访问设置器。

ExecutorService exec = Executors.newCachedThreadPool();
if(exec instanceof ThreadPoolExecutor)
    ((ThreadPoolExecutor) exec).setCorePoolSize(10);
else 
    throw new AssertionError("Oops,bad assumption");

8.4 扩展ThreadPoolExecutor

在执行任务的线程中将调用beforeExecute和afterExecute方法,在这些方法中可以添加日志、计时、监视或统计信息收集的功能。在线程池完成关闭操作时调用terminated方法,也就是在所有任务都已经完成并且工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录通知或者收集finalizer统计信息等操作。

public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s",t,r));
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
        } finally {
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time = %dns", totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

8.5 递归算法的并行化

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转换为并行循环。

void processSequentially (List<Element> elements){
    for (Element e : elements)
        process(e);
}

void processInParallel (Executor exec, List<Element> elements){
    for (final Element e : elements)
        exec.execute(new Runnable(){
            public void run() { process(e); }
        });
}
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
    for (Node<T> n : nodes){
        results.add(n.compute());
        sequentialRecursive(n.getChildren(), results);
    }
}

public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes,
 final Collection<T> results){
    for (final Node<T> n : nodes){
        exec.execute(new Runnable(){
            public void run(){ results.add(n.compute()); }
        });
        parallelRecursive(exec, n.getChildren(), results);
    }
}

注意:这里的并行指的是compute方法的调用,而不是遍历过程,在这里遍历过程依旧是串行的。

public<T> Collection<T> getParallelResults (List<Node<T>> nodes) 
    throws InterruptedException{
    ExecutorService exec = ExecutorService.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec,nodes,resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUtil.SECONDES);
    return resultQueue;
}

“谜题”包含了一个初始位置,一个目标位置,以及用于判断是否是有效移动的规则集。我们所需做的是在谜题空间中查找,直到找到一个解答或者找遍了整个空间都没有发现答案。

谜题的抽象类:

public interface Puzzle<P, M> {
    P initialPosition();
    boolean isGoal(P position);
    Set<M> legalMoves (P position);
    P move(P position, M move);
}

串行的谜题解答器:

public class SequentialPuzzleSolver<P,M> {
    private final Puzzle<P,M> puzzle;
    private final Set<P> seen = new HashSet<P>();

    public SequentialPuzzleSolver(Puzzle<P,M> puzzle){
        this.puzzle = puzzle;
    }

    public List<M> solve(){
        P pos = puzzle.initialPosition();
        return search(new Node<P, M>(pos,null,null));
    }

    private List<M> search (Node<P,M> node){
        if (!seen.contains(node.pos)){
            seen.add(node.pos);
            if (puzzle.isGoal(node.pos))
                return node.asMoveList();
            for (M move : puzzle.legalMoves(node.pos)){
                P pos = puzzle.move(node.pos, move);
                Node<P,M> child = new Node<P, M>(pos, move, node);
                List<M> result = search(child);
                if (result != null)
                    return result;
            }
        }
        return null;
    }

    static class Node<P,M> {
        final P pos;
        final M move;
        final Node<P,M> prev;
        Node(P pos, M move, Node<P,M> prev){
            this.pos = pos;
            this.move = move;
            this.prev = prev;
        }

        List<M> asMoveList(){
            List<M> solution = new LinkedList<M>();
            for (Node<P,M> n = this; n.move != null; n = n.prev)
                solution.add(0,n.move);
            return solution;
        }
    }
}

由于计算某次移动的过程很大程度上与计算其他移动的过程是相互独立的,因此如果有多个处理器使用,我们可以采用并行方法来减少查找的时间。

设置一个闭锁来保证阻塞获取结果:

public class ValueLatch<T> {
    private T value = null;
    private final CountDownLatch done = new CountDownLatch(1);

    public boolean isSet(){
        return (done.getCount() == 0);
    }

    public synchronized void setValue(T newValue){
        if (!isSet()){
            value = newValue;
            done.countDown();
        }
    }

    public T getValue() throws InterruptedException{
        done.await();
        synchronized (this){
            return value;
        }
    }
}

并发的谜题解答器:

public class ConcurrentPuzzleSolver<P,M> {
    private final Puzzle<P,M> puzzle;
    private final ExecutorService exec;
    private final ConcurrentHashMap<P,Boolean> seen;
    final ValueLatch<Node<P,M>> solution = new ValueLatch<Node<P, M>>();

    public ConcurrentPuzzleSolver(Puzzle<P,M> puzzle, ExecutorService exec){
        this.puzzle = puzzle;
        this.exec = exec;
        seen = new ConcurrentHashMap<P, Boolean>();
    }

    public List<M> solve() throws InterruptedException{
        try{
            P p = puzzle.initialPosition();
            exec.execute(newTask(p,null,null));
            //阻塞直到找到解答
            Node<P,M> solveNode = solution.getValue();
            return (solveNode == null) ? null : solveNode.asMoveList();
        } finally {
            exec.shutdown();
        }
    }

    protected Runnable newTask(P p, M m, Node<P,M> n){
        return new SolverTask(p,m,n);
    }

    class SolverTask extends Node<P,M> implements Runnable{

        SolverTask(P pos, M move, Node<P, M> prev) {
            super(pos, move, prev);
        }

        @Override
        public void run() {
            //已经找到答案或者已经遍历了这个位置
            if (solution.isSet() || seen.putIfAbsent(pos,true) != null) return;
            if (puzzle.isGoal(pos))
                solution.setValue(this);
            else
                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos,m),m,this));
        }
    }

    static class Node<P,M> {
        final P pos;
        final M move;
        final Node<P,M> prev;
        Node(P pos, M move, Node<P,M> prev){
            this.pos = pos;
            this.move = move;
            this.prev = prev;
        }

        List<M> asMoveList(){
            List<M> solution = new LinkedList<M>();
            for (Node<P,M> n = this; n.move != null; n = n.prev)
                solution.add(0,n.move);
            return solution;
        }
    }

}

但实际上这里存在一个问题:如果谜题空间中不存在答案呢?那么将会一直运行下去。因此,我们可以记录活动任务的数量,当该值为零时将解答设置null。

public class PuzzleSolver<P,M> extends ConcurrentPuzzleSolver<P,M> {
    private final AtomicInteger taskCount = new AtomicInteger(0);

    public PuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec) {
        super(puzzle, exec);
    }

    @Override
    protected Runnable newTask(P p, M m, Node<P, M> n) {
        return new CountingTask(p,m,n);
    }

    class CountingTask extends SolverTask{

        CountingTask(P pos, M move, Node<P, M> prev) {
            super(pos, move, prev);
            taskCount.incrementAndGet();
        }

        @Override
        public void run() {
            try{
                super.run();
            }finally {
                if (taskCount.decrementAndGet() == 0)
                    solution.setValue(null);
            }
        }
    }
}

注意:串行版本的程序执行深度优先搜索,因此搜索过程将受限于栈的大小。并发版本的程序执行广度优先搜索,因此不会受到栈大小的限制,但如果代搜索的或者已搜索的位置集合大小超过了可用的内存总量,那么仍可能耗尽内存。

上一篇 下一篇

猜你喜欢

热点阅读