Java-常见的线程池

2021-02-02  本文已影响0人  zzq_nene

一、常见的线程池

FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor
这些常见的线程池,基本都是通过Executors中对应的new方法进行创建的。

1.FixedThreadPool

核心线程数固定,没有非核心线程,LinkedBlockingQueue 无界的Queue

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

2.CachedThreadPool

只有非核心线程,60s的空闲线程保活,SynchronousQueue,直接提交给线程运行。OkHttp中的线程池就是用的CachedThreadPool,核心线程数是0,最大线程数是Integer.MAX_VALUE,使用SynchronousQueue队列,SynchronousQueue队列是0容量的阻塞队列,那么就会直接交给线程执行

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

3.ScheduledThreadPool

有延迟执行和周期重复执行的线程池,new DelayedQueue ()
DelayedQueue : 优先队列 PriorityQueue 存储元素

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    // ScheduledThreadPoolExecutor.java
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
ScheduledThreadPool实现周期性原理
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0L)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          -unit.toNanos(delay),
                                          sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        // 将任务添加到DelayWorkQueue队列中
        delayedExecute(t);
        return t;
    }

获取DelayWorkQueue队列中的任务

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0L)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

执行任务

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                super.run();
            else if (super.runAndReset()) {
                // 判断是否需要周期性执行,如果是
                // 则重置下一次要执行的时间
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

放回到队列中

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

主要就是通过修改ScheduledFutureTask的time值,而time是一个volatile修饰,time的值的修改则是通过优先级period的值来判断对time加上还是减去period值

4.SingleThreadExecutor

LinkedBlockingQueue
一个核心线程,不需要处理同步

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

二、饱和策略

RejectedExecutionHandler 饱和策略
(1)DiscardOldestPolicy:直接丢弃最老的那个任务,执行当前任务。如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) 意思就是在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
(2)AbortPolicy:直接抛出异常,默认的使用,即拒绝执行任务RejectedExecutionException。
(3)CallerRunsPolicy:让调用者线程去执行任务,即谁往线程池中提交任务,就由谁来执行这个任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
(4)DiscardPolicy:把最新提交的任务丢弃
如果这这四种拒绝策略都不满足,则自己实现RejectedExecutionHandler接口,自己定义一个拒绝策略。

上一篇下一篇

猜你喜欢

热点阅读