Java 并发

【Java 并发笔记】线程池相关整理(下)

2019-01-29  本文已影响1人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

接上一篇 【Java 并发笔记】线程池相关整理(上)

2.3 ThreadPoolExecutor

ThreadPoolExecutor 结构
public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

构造器参数

参数 说明
corePoolSize 核心线程数量。
maximumPoolSize 最大线程数量。
allowCoreThreadTimeOut 是否允许线程超时(设置为 true 时与 keepAliveTime,unit 一起起作用)
keepAliveTime 线程存活时间
unit 单位
workQueue 存储任务的阻塞队列(缓存队列)。
handler 拒绝处理任务类(默认:AbortPolicy 会抛异常)。
threadFactory 线程工厂(默认:DefaultThreadFactory)。

corePoolSize

maximumPoolSize

线程池结构

keepAliveTime

unit

单位 说明
TimeUnit.DAYS
TimeUnit.HOURS 小时
TimeUnit.MINUTES 分钟
TimeUnit.SECONDS
TimeUnit.MILLISECONDS 毫秒
TimeUnit.MICROSECONDS 微妙
TimeUnit.NANOSECONDS 纳秒

workQueue

阻塞队列 说明
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
DelayQueue 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue 一个不存储元素的阻塞队列。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。

handle

策略 说明
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常。
ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务。

threadFactory

工作顺序


类的属性

// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 存放工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 最大线程池容量
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器
private volatile RejectedExecutionHandler handler;
// 线程等待运行时间
private volatile long keepAliveTime;
// 是否运行核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;
// 默认拒绝执行处理器
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
//
private static final RuntimePermission shutdownPerm =
    new RuntimePermission("modifyThread");

ctl 变量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//-00100000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;//00000000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;//00100000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;//01000000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;//01100000000000000000000000000000
状态 10 进制值 二进制值
RUNNING -536870912 -00100000000000000000000000000000
SHUTDOWN 0 0
STOP 536870912 00100000000000000000000000000000
TIDYING 1073741824 01000000000000000000000000000000
TERMINATED 1610612736 01100000000000000000000000000000
/**
 * 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111
 * ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
 * 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
 * 
 * @param c
 *            该参数为存储runState和workerCount的int值
 * @return runState的值
 */
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}


/**
 * 这个方法用于取出workerCount的值
 * 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
 * 保留参数的低29位,也就是workerCount的值
 * 
 * @param c
 *            ctl, 存储runState和workerCount的int值
 * @return workerCount的值
 */
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

/**
 * 将runState和workerCount存到同一个int中
 * “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
 * 
 * @param rs
 *            runState移位过后的值,负责填充返回值的高3位
 * @param wc
 *            workerCount移位过后的值,负责填充返回值的低29位
 * @return 两者或运算过后的值
 */
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

// 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
线程池状态转换

2.3.1 Worker

private final class Worker
         extends AbstractQueuedSynchronizer
         implements Runnable{
     Worker(Runnable firstTask) {
         setState(-1); // inhibit interrupts until runWorker
         this.firstTask = firstTask;
         this.thread = getThreadFactory().newThread(this); // 创建线程
     }
     /** Delegates main run loop to outer runWorker  */
     public void run() {
         runWorker(this);
     }
     // ...
 }
// state只有0和1,互斥
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;// 成功获得锁
    }
    // 线程进入等待队列
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

execute 方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 活动线程数 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize
        if (addWorker(command, true))
            // 添加成功返回
            return;
        c = ctl.get();
    }
    // 活动线程数 >= corePoolSize
    // runState为RUNNING && 队列未满
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double check
        // 非RUNNING状态 则从workQueue中移除任务并拒绝
        if (!isRunning(recheck) && remove(command))
            reject(command);// 采用线程池指定的策略拒绝任务
        // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
        else if (workerCountOf(recheck) == 0)
            // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
            // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
            addWorker(null, false);

        // 两种情况:
        // 1.非RUNNING状态拒绝新的任务
        // 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}

addWorker 方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry: for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);// 当前线程池状态

        // Check if queue empty only if necessary.
        // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
        // workQueue.isEmpty())
        // 满足下列调价则直接返回false,线程创建失败:
        // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束
        // rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务
        // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,
        // false),任务为null && 队列为空
        // 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?
        // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务
        // 但是此时队列不为空,那么还得创建线程把任务给执行完才行。
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        // 走到这的情形:
        // 1.线程池状态为RUNNING
        // 2.SHUTDOWN状态,但队列中还有任务需要执行
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))// 原子操作递增workCount
                break retry;// 操作成功跳出的重试的循环
            c = ctl.get(); // Re-read ctl
            if (runStateOf(c) != rs)// 如果线程池的状态发生变化则重试
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // wokerCount递增成功

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 并发的访问线程池workers对象必须加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);

                // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将新启动的线程添加到线程池中
                    workers.add(w);
                    // 更新largestPoolSize
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
            // 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
            if (workerAdded) {
                t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,则从wokers中移除w并递减wokerCount
        if (!workerStarted)
            // 递减wokerCount会触发tryTerminate方法
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker 方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // Worker的构造函数中抑制了线程中断setState(-1),所以这里需要unlock从而允许中断
    w.unlock();
    // 用于标识是否异常终止,finally中processWorkerExit的方法会有不同逻辑
    // 为true的情况:1.执行任务抛出异常;2.被中断。
    boolean completedAbruptly = true;
    try {
        // 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行前可以插入一些处理,子类重载该方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();// 执行用户任务
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    // 和beforeExecute一样,留给子类去重载
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }

        completedAbruptly = false;
    } finally {
        // 结束线程的一些清理工作
        processWorkerExit(w, completedAbruptly);
    }
}

getTask 方法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    retry: for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
        // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
        // 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了,所以必须要decrement
        // wokerCount
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 递减workerCount值
            decrementWorkerCount();
            return null;
        }

        // 标记从队列中取任务时是否设置超时时间
        boolean timed; // Are workers subject to culling?

        // 1.RUNING状态
        // 2.SHUTDOWN状态,但队列中还有任务需要执行
        for (;;) {
            int wc = workerCountOf(c);

            // 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
            // 2.allowCoreThreadTimeOut == false && wc >
            // corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只要超过的线程才会
            timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
            // 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
            // poll出异常了重试
            // 2.timeOut == true && timed ==
            // false:看后面的代码workerQueue.poll超时时timeOut才为true,
            // 并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true)
            // 所以超时不会继续执行而是return null结束线程。(重点:线程是如何超时的???)
            if (wc <= maximumPoolSize && !(timedOut && timed))
                break;

            // workerCount递减,结束当前thread
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get(); // Re-read ctl
            // 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
            // 1.以指定的超时时间从队列中取任务
            // 2.core thread没有超时
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            timedOut = true;// 超时
        } catch (InterruptedException retry) {
            timedOut = false;// 线程被中断重试
        }
    }
}

processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 如果被中断,则需要减少workCount    // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    // 获取可重入锁
    final ReentrantLock mainLock = this.mainLock;
    // 获取锁
    mainLock.lock();
    try {
        // 将worker完成的任务添加到总的完成任务中
        completedTaskCount += w.completedTasks;
        // 从workers集合中移除该worker
        workers.remove(w);
    } finally {
        // 释放锁
        mainLock.unlock();
    }
    // 尝试终止
    tryTerminate();
    // 获取线程池控制状态
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
                min = 1;
            if (workerCountOf(c) >= min) // workerCount大于等于min
                // 直接返回
                return; // replacement not needed
        }
        // 添加worker
        addWorker(null, false);
    }
}

tryTerminate 方法

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 以下状态直接返回:
        // 1.线程池还处于RUNNING状态
        // 2.SHUTDOWN状态但是任务队列非空
        // 3.runState >= TIDYING 线程池已经停止了或在停止了
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;

        // 只能是以下情形会继续下面的逻辑:结束线程池。
        // 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
        // 2.STOP状态,当调用了shutdownNow方法

        // workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
        // 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
        if (workerCountOf(c) != 0) { // Eligible to terminate
            // runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
            // ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 进入TIDYING状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 子类重载:一些资源清理工作
                    terminated();
                } finally {
                    // TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 继续awaitTermination
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

shutdown 和 shutdownNow 方法

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查shutdown权限
        checkShutdownAccess();
        // 设置线程池控制状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断空闲worker
        interruptIdleWorkers();
        // 调用shutdown钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终止
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP状态:不再接受新任务且不再执行队列中的任务。
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 返回队列中还没有被执行的任务。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

2.3.2 任务的提交

public class Test{

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();
        Future<String> future = es.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "future result";
            }
        });
        try {
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

submit 方法

// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
......
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

2.3.3 扩展线程池

3. 常用线程池

线程池名称 说明
newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行。

Executors

newSingleThreadExecutor

new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

newFixedThreadPool

new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

newCachedThreadPool

new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

newScheduledThreadPool

new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);

4. 使用线程池注意

避免使用无界队列

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), // 使用有界队列,避免OOM
                new ThreadPoolExecutor.DiscardPolicy());

明确拒绝任务时的行为

获取处理结果和异常

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception in call~");// 该异常会在调用Future.get()时传递给调用者
        }
});
     
try {
  Object result = future.get();
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

性质不同的任务可用使用不同规模的线程池分开处理

参考资料

https://www.cnblogs.com/dolphin0520/p/3932921.html
https://blog.csdn.net/programmer_at/article/details/79799267#2-threadpoolexecutor
https://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html
https://www.cnblogs.com/superfj/p/7544971.html
https://blog.csdn.net/qq1137623160/article/details/79772505
https://blog.csdn.net/programmer_at/article/details/79799267#4-threadpoolexecutor%E6%BA%90%E7%A0%81
https://www.cnblogs.com/leesf456/p/5585627.html

上一篇 下一篇

猜你喜欢

热点阅读