线程池的创建和使用

2017-06-14  本文已影响0人  每天要吃三个小朋友

创建线程池

Executor框架

Executor框架
Executor接口里只有一个execute方法,执行Runnanble任务。
ExecutorService接口扩展了Executor,添加了线程生命周期的管理,提供终止任务、返回任务结果等方法。
AbstractExecutorService类实现了ExecutorService,提供了例如submit方法的默认实现逻辑。
ThreadPoolExecutor类继承了AbstractExecutorService,提供线程池的具体实现,实际使用时选它

构造方法

ThreadPoolExecutor的构造函数参数如下

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
}

corePoolSize 线程池的目标大小,刚创建时的线程池就这么大
maximumPoolSize 线程池的最大上限
keepAliveTime 线程存活时间。当线程数量大于maximumPoolSize,超出存活时间的线程会被回收

预设的线程池

在ThreadPoolExecutor基础上有几种预先设定好的线程池,分别是

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}

corePoolSizemaximumPoolSize都被传入同一数值,keepAliveTime0。因此,线程池一旦创建线程数量不会变。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()));
}

newFixedThreadPool的线程数固定为1的版本,并且线程数量不能被配置。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
}

它会创建一个会缓存的线程池,线程数从0到无限,超时1分钟。当有空闲线程时就用,没有就创建新的。(这个好TM灵活啊)

源码

Timer的替代品,能定期或延迟执行任务。(貌似跟主题不相关,先摸了)

等待与饱和

等待队列

线程池大小或是系统资源是有限的,当任务的处理速度跟不上提交速度,就会有Runnanble被阻塞、等待执行,这就形成了阻塞队列BlockingQueue

它的几种实现方式

newChachedThreadPool能接受无限多任务,因此使用SynchronousQueue就很合理。
但是其他队列的话,如果被填满了不能接收新任务时怎么办?可用下面的饱和策略解决。

饱和策略

ThreadExecutorPool的饱和策略通过传入RejectedExecutionHandler实现

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

AbortPolicy是默认实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
            }
        }
    }

DiscardPolicy的```rejectedExecution``直接是空方法,什么也不干,如果队列满了,后续的任务都抛弃掉。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
            }
        }
    }

DiscardOldestPolicy会把等待队列里最最旧的任务干掉,让新的任务进来

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
            }
        }
    }

CallerRunsPolicy会让主线程来执行这个任务,贼6!

线程工厂 ThreadFactory

线程都是工厂创建的,如果不为ThreadPoolExecutor设定线程工厂,就会默认用defaultThreadFactory

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                        "-thread-";
       }

          public Thread newThread(Runnable r) {
              Thread t = new Thread(group, r,
                      namePrefix + threadNumber.getAndIncrement(),
                                        0);

                       if (t.isDaemon())
                            t.setDaemon(false);
                        if (t.getPriority() != Thread.NORM_PRIORITY)
                            t.setPriority(Thread.NORM_PRIORITY);
                        return t;
       }
   }

打印线程名称出现的类似pool-1-thread-1可以在这里设定格式;创建的线程也都是非守护线程。如果需要更改,实现ThreadFactory后传给ThreadPoolExecutor即可。

使用线程池

未完成

上一篇下一篇

猜你喜欢

热点阅读