Java基础知识Java-多线程

知道这些线程池底层源码的知识,你也能和面试官扯半小时

2020-12-22  本文已影响0人  Java古德

一、什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

二、为什么要使用线程池

当系统创建一个新的线程,因为它涉及到与操作系统的交互,所以它的成本是比较高的。经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。当程序中需要使用存活周期很短的线程时,应该考虑使用线程池。

三、JDK源码中的线程池

// 创建一个线程池,该线程池重用在一个共享的无界队列上运行的固定数量的线程。
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.  

// 在任何时候,在大多数情况下线程都是活动的处理任务。
At any point, at most {@code nThreads} threads will be active processing tasks.

// 如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到有一个线程可用为止。
If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

// 如果任何线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,则需要一个新线程来替代它
If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. 

// 池中的线程将一直存在,直到显式结束
The threads in the pool will exist until it is explicitly.

四、使用线程池执行线程任务的步骤

Executors 是一个工厂类,主要用来创建 ExecutorService

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

用 newFixedThreadPool() 这个静态方法用来创建一个可重用固定线程数的线程池。

示例代码:

​
// 创建一个实现Runnable接口的类 Thread1
class Thread1 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <= 50; i++) {
            if (i % 2 == 0) {
                System.out.println(Thread.currentThread().getName() + "的i值为: " + i);
            }
        }
    }
}

// 创建一个实现Runnable接口的类 Thread2
class Thread2 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <= 50; i++) {
            if (i % 2 != 0) {
                System.out.println(Thread.currentThread().getName() + "的i值为: " + i);
            }
        }
    }
}

public class ThreadPool {

    public static void main(String[] args) {
        // 创建一个固定线程数的的线程池
        ExecutorService pool = Executors.newFixedThreadPool(10);

        // 向线程中提交两个任务,执行指定的线程的操作。需要提供实现Runnable接口实现类的对象
        pool.execute(new Thread1());
        pool.execute(new Thread2());

        // 关闭线程池
        pool.shutdown();
    }
}

​

​
pool-1-thread-2的i值为: 1
pool-1-thread-2的i值为: 3
pool-1-thread-2的i值为: 5
pool-1-thread-2的i值为: 7
pool-1-thread-2的i值为: 9
pool-1-thread-2的i值为: 11
pool-1-thread-2的i值为: 13
pool-1-thread-2的i值为: 15
pool-1-thread-2的i值为: 17
pool-1-thread-2的i值为: 19
pool-1-thread-2的i值为: 21
pool-1-thread-2的i值为: 23
pool-1-thread-2的i值为: 25
pool-1-thread-2的i值为: 27
pool-1-thread-2的i值为: 29
pool-1-thread-2的i值为: 31
pool-1-thread-2的i值为: 33
pool-1-thread-2的i值为: 35
pool-1-thread-2的i值为: 37
pool-1-thread-2的i值为: 39
pool-1-thread-2的i值为: 41
pool-1-thread-2的i值为: 43
pool-1-thread-2的i值为: 45
pool-1-thread-2的i值为: 47
pool-1-thread-2的i值为: 49
pool-1-thread-1的i值为: 0
pool-1-thread-1的i值为: 2
pool-1-thread-1的i值为: 4
pool-1-thread-1的i值为: 6
pool-1-thread-1的i值为: 8
pool-1-thread-1的i值为: 10
pool-1-thread-1的i值为: 12
pool-1-thread-1的i值为: 14
pool-1-thread-1的i值为: 16
pool-1-thread-1的i值为: 18
pool-1-thread-1的i值为: 20
pool-1-thread-1的i值为: 22
pool-1-thread-1的i值为: 24
pool-1-thread-1的i值为: 26
pool-1-thread-1的i值为: 28
pool-1-thread-1的i值为: 30
pool-1-thread-1的i值为: 32
pool-1-thread-1的i值为: 34
pool-1-thread-1的i值为: 36
pool-1-thread-1的i值为: 38
pool-1-thread-1的i值为: 40
pool-1-thread-1的i值为: 42
pool-1-thread-1的i值为: 44
pool-1-thread-1的i值为: 46
pool-1-thread-1的i值为: 48
pool-1-thread-1的i值为: 50

​

五、剖析ThreadPoolExecutor 底层源码

​
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

​

1. 创建一个线程池时的参数

当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。

线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程;如果超过corePoolSize,则新建的是非核心线程。

如果队列 (workQueue) 满了,并且已创建的线程数小于maximumPoolSize,则线程池会进行创建新的线程执行任务。 如果
等待执行的线程数 大于 maximumPoolSize,缓存在队列中; 如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。

当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime才起作用;

当一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉

它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS

维护着等待执行的 Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。

线程数量大于最大线程数,当超过workQueue的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。

从JDK 源码里面可以看到:

在这里插入图片描述

其中里面其设置的 corePoolSize(核心池的大小) 和 maximumPoolSize(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用Executors来创建ThreadPoolExecutor。

使用Executors创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建ThreadPoolExecutor。

​​ 在这里插入图片描述
 ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;
 // 设置核心池的大小
 pool1.setCorePoolSize(15);
 // setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止
 pool1.setKeepAliveTime(60000, TimeUnit.HOURS);

Executors的工厂方法主要就返回了ThreadPoolExecutor对象

继承结构:


在这里插入图片描述

ThreadPoolExecutor 继承了 AbstractExecutorService

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // 表示线程池数量的位数,很明显是29,Integer.SIZE=32
        private static final int COUNT_BITS = Integer.SIZE - 3;
         // 表示线程池最大数量,2^29 - 1
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;

        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }

        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */

        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }

        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }

        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }

        /**
         * Attempts to CAS-increment the workerCount field of ctl.
         */
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }

        /**
         * Attempts to CAS-decrement the workerCount field of ctl.
         */
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }

        /**
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
         */
        private void decrementWorkerCount() {
            do {} while (! compareAndDecrementWorkerCount(ctl.get()));
        }

        /**
         * The queue used for holding tasks and handing off to worker
         * threads.  We do not require that workQueue.poll() returning
         * null necessarily means that workQueue.isEmpty(), so rely
         * solely on isEmpty to see if the queue is empty (which we must
         * do for example when deciding whether to transition from
         * SHUTDOWN to TIDYING).  This accommodates special-purpose
         * queues such as DelayQueues for which poll() is allowed to
         * return null even if it may later return non-null when delays
         * expire.
         */
         // 用于存放线程任务的阻塞队列
        private final BlockingQueue<Runnable> workQueue;

        /**
         * Lock held on access to workers set and related bookkeeping.
         * While we could use a concurrent set of some sort, it turns out
         * to be generally preferable to use a lock. Among the reasons is
         * that this serializes interruptIdleWorkers, which avoids
         * unnecessary interrupt storms, especially during shutdown.
         * Otherwise exiting threads would concurrently interrupt those
         * that have not yet interrupted. It also simplifies some of the
         * associated statistics bookkeeping of largestPoolSize etc. We
         * also hold mainLock on shutdown and shutdownNow, for the sake of
         * ensuring workers set is stable while separately checking
         * permission to interrupt and actually interrupting.
         */
         // 重入锁
        private final ReentrantLock mainLock = new ReentrantLock();

        /**
         * Set containing all worker threads in pool. Accessed only when
         * holding mainLock.
         */
          // 线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问
        private final HashSet<Worker> workers = new HashSet<Worker>();

        /**
         * Wait condition to support awaitTermination
         */
         // 等待条件支持终止
        private final Condition termination = mainLock.newCondition();

        /**
         * Tracks largest attained pool size. Accessed only under
         * mainLock.
         */
        private int largestPoolSize;

        /**
         * Counter for completed tasks. Updated only on termination of
         * worker threads. Accessed only under mainLock.
         */
        private long completedTaskCount;

        /*
         * All user control parameters are declared as volatiles so that
         * ongoing actions are based on freshest values, but without need
         * for locking, since no internal invariants depend on them
         * changing synchronously with respect to other actions.
         */

        /**
         * Factory for new threads. All threads are created using this
         * factory (via method addWorker).  All callers must be prepared
         * for addWorker to fail, which may reflect a system or user's
         * policy limiting the number of threads.  Even though it is not
         * treated as an error, failure to create threads may result in
         * new tasks being rejected or existing ones remaining stuck in
         * the queue.
         *
         * We go further and preserve pool invariants even in the face of
         * errors such as OutOfMemoryError, that might be thrown while
         * trying to create threads.  Such errors are rather common due to
         * the need to allocate a native stack in Thread.start, and users
         * will want to perform clean pool shutdown to clean up.  There
         * will likely be enough memory available for the cleanup code to
         * complete without encountering yet another OutOfMemoryError.
         */
         // 创建新线程的线程工厂
        private volatile ThreadFactory threadFactory;

        /**
         * Handler called when saturated or shutdown in execute.
         */
         // 饱和策略
        private volatile RejectedExecutionHandler handler;

总结

ctl是主要的控制状态

  • workerCount:表示有效的线程数目
  • runState:表示线程池里线程的运行状态

2. 重要方法

① execute() 方法

源码剖析

// 调用execute方法将线程提交到线程池中 
 public void execute(Runnable command) {
        // 如果执行的任务为空,则会抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1\. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2\. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3\. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         // 获取线程池的控制状态
        int c = ctl.get();
        // 如果workerCount值小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            // 添加任务到worker集合当中,成功的话返回
            if (addWorker(command, true))
                return;
             // 如果失败,再次获取线程池的控制状态
            c = ctl.get();
        }
        // 如果corePoolSize已经满了,则需要加入到阻塞队列
        // 判断线程池的状态以及是否可以往阻塞队列中继续添加runnable
        if (isRunning(c) && workQueue.offer(command)) {
            // 获取线程池的状态
            int recheck = ctl.get();
             // 再次检查状态,线程池不处于RUNNING状态,将任务从workQueue队列中移除
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果此时队列已满,则会采取相应的拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

总结

  • 往corePoolSize中加入任务进行执行
  • 当corePoolSize满时往阻塞队列中加入任务
  • 阻塞队列满时并且maximumPoolSize已满,则采取相应的拒绝策略
② addWorker()方法

源码剖析

private boolean addWorker(Runnable firstTask, boolean core) {
        // 外部循环标志
        retry:
        for (;;) {
            // 获取线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取workerCount
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 跳出循环    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 实例化worker对象
            w = new Worker(firstTask);
            // 获取worker的线程
            final Thread t = w.thread;
            // 如果获取的线程不为空
            if (t != null) {
                // 初始线程池的锁
                final ReentrantLock mainLock = this.mainLock;
                 // 得到锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 获取锁后再次检查,获取线程池runState
                    int rs = runStateOf(ctl.get());

                    // 判断
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                         // 检查线程是否启动
                        if (t.isAlive()) // precheck that t is startable
                            // 如果未启动存活,则抛出异常
                            throw new IllegalThreadStateException();
                        // 往corePoolSize中加入任务    
                        workers.add(w);
                        // 获取workers集合的大小
                        int s = workers.size();
                         // 如果大小超过largestPoolSize
                        if (s > largestPoolSize)
                            // 重新设置线程池拥有最大线程数的大小
                            largestPoolSize = s;
                        // 改变状态    
                        workerAdded = true;
                    }
                } finally {
                 // 释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 运行
                    t.start();
                    // 改变状态
                    workerStarted = true;
                }
            }
        } finally {
             // 如果worker没有启动成功
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回worker是否成功启动的标记
        return workerStarted;
    }

六、使用线程池的好处

作者:扬帆向海
原文链接:https://zhangxy.blog.csdn.net/article/details/105999361

上一篇 下一篇

猜你喜欢

热点阅读