并发编程

理解线程池

2019-10-28  本文已影响0人  今年五年级

类继承结构

image.png

相关源码

Executor

public interface Executor {
    void execute(Runnable command);
}

ExecutorService

public interface ExecutorService extends Executor {
    //关闭线程池,已提交的任务继续执行,不接受继续提交新任务
    void shutdown();
    //关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
    //跟上面区别:会去停止当前正在进行的任务
    List<Runnable> shutdownNow();
    //线程池是否已经关闭
    boolean isShutdown();
    //如果调用了shutdown()或者shutdownNow()后,所有任务结束了,那么返回true
    //这个方法必须在调用shutdown或shutdownNow之后调用才会返回true
    boolean isTerminated();
    //先调用shutdown或者shutdownNow,然后调用这个方法等待所有线程真正完成,返回值意味着有无超时
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    //提交一个callable任务
    <T> Future<T> submit(Callable<T> task);
    //提交一个runnable任务,第二个参数将放在future中作为返回值,因为runnable的run方法并不会返回任何东西
    /**
     * runnable的void run()是没有返回值的,所以,通常,我们需要的话,会在submit中指定第二个参数为返回值
     * 其实到时候会通过这两个参数,将其包装成Callable,和Runnable的区别在于run()没有返回值,Callale的call
     * 方法有返回值,同时,如果运行出现异常,call()会抛出异常
     */
    <T> Future<T> submit(Runnable task,T result);
    //提交一个runnable任务
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    //在超时时间内执行所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
        long timeout,
        TimeUnit unit
    ) throws InterruptedException;
    //只要其中有一个任务结束了,就可以返回,返回执行完的那个任务的结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException, TimeoutException;
}

FutureTask

摘抄部分方法

public class FutureTask<V> implements RunnableFuture<V> {
    /**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        //如果执行过程中出错,走这一步,future有,future.get()抛出异常
        throw new ExecutionException((Throwable)x);
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }
    //获取执行结果
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    protected void done() { }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
}

线程池主要组件

image.png

上图没有考虑队列是否有界,提交任务时队列满了怎么办?什么情况下会创建新的线程?提交任务时线程池满了怎么办?空闲线程怎么关掉

我们经常会使用 Executors 这个工具类来快速构造一个线程池,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    //最终指向的方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池主要属性

  1. corePoolSize
  2. maximumPoolSize
    线程池允许创建的最大线程数
  3. keepAliveTime
    空闲线程的存活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么该线程可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收
  4. workQueue
    任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和LinkedBlockingQueue)
  5. threadFactory
    用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Order-Thread-1, Product-Thread-2 类似这样。
  6. handler
    当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑

其他属性

用32位整数来保存线程池运行状态(高3位)和线程池线程数量(低29位)

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //计数位:一个int整数是4个字节,一个字节占用8位,所以共占用32位     32-3=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池容量1左移29位,为1*2^29-1=536,870,911
    //00000000 00000000 00000000 00000001->001 0000 00000000 00000000 00000000
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池运行状态保存在32位中的高3位
    //-1在Java底层是由32个1表示的,左移29位的话,即111 00000 00000000 00000000 00000000
    //也就是低29位全部为0,高3位全部为1的话,表示RUNNING状态,即-536870912;
    private static final int RUNNING    = -1 << COUNT_BITS;
    //0在Java底层是由32个0表示的,无论左移多少位,还是32个0,即000 00000 00000000 00000000 00000000
    //也就是低29位全部为0,高3位全部为0的话,表示SHUTDOWN状态,即0
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //1在Java底层是由前面的31个0和1个1组成的,左移29位的话,即001 00000 00000000 00000000 00000000
    //也就是低29位全部为0,高3位为001的话,表示STOP状态,即536870912
    private static final int STOP       =  1 << COUNT_BITS;
    //2在Java底层是由前面的30个0和1个10组成的,左移29位的话,即010 00000 00000000 00000000 00000000
    //也就是低29位全部为0,高3位为010的话,表示TIDYING状态,即1073741824
    private static final int TIDYING    =  2 << COUNT_BITS;
    //3在Java底层是由前面的30个0和1个11组成的,左移29位的话,即011 00000 00000000 00000000 00000000
    //也就是低29位全部为0,高3位为011的话,表示TERMINATED状态,即1610612736
    private static final int TERMINATED =  3 << COUNT_BITS;

    //~是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1
    //而~CAPACITY则表示高位的3个1,29个低位的0,然后再与入参c执行按位与操作,即高3位保持原样
    //低29位全部设置为0,也就获取了线程池的运行状态runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //也就是与000 11111 11111111 11111111 11111111进行与操作,c的前三位通过与000进行与操作
    //无论c前三位为何值,最终都会变成000,也就是舍弃前三位的值,而c的低29位与29个1进行与操作
    //c的低29位还是会保持原值,这样就从AtomicInteger ctl中解析出了workerCount的值
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    //或操作:规则:即  0 | 0= 0 ,  1 | 0= 1  , 0 | 1= 1  ,  1 | 1= 1 
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

线程池的各个状态和状态变化转换过程

  1. running:正常状态,接受新的任务,处理等待队列中的任务
  2. shutdow:不接受新任务提交,但是会继续处理等待队列中任务
  3. stop:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行的任务
  4. tidying:所有任务都销毁了,workCount=0,线程池状态在转换为这个状态的时候,会执行terminated()
  5. terminated:terminated()方法结束后,线程池的状态变为这个

源码解读

内部类worker

Doug Lea 把线程池中的线程包装成了一个个 Worker(工人),就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker

注意:worker构造方法创建线程传入的参数是自身对象

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        //执行任务的线程
        final Thread thread;
        //runnable是任务,之所以叫firstTask是因为线程池中线程可以执行多个任务
        //初始化如果传了这里有,如果没传就在任务队列中取任务getTask()
        Runnable firstTask;
        //存放此线程完成的任务数
        volatile long completedTasks;
        //初始化这个worker,传入参数为第一个任务,也可以为空
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建当前worker所需要的线程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        //...下面省略,就是用 AQS 操作,来获取这个线程的执行权,用了独占锁
    }

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    //我们这里调用submit方法,传入一个期望的返回值
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        //将执行结果包装为RunnableFuture
        RunnableFuture<T> ftask = newTaskFor(task, result);
        //具体的实现在子类ThreadPoolExecutor中
        //又因为runnableFuture实现了runnable接口,因此可以传入
        execute(ftask);
        return ftask;
    }

    /**下面方法来自FutureTask类*/
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    /**下面方法来自Excutors工具类,即将result和runnable包装为callable并返回*/
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

execute

下面开始分析ThreadPoolExecutor实现的execute方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取的时候-536870912
        int c = ctl.get();
        //workerCountOf(c)=0
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        //外层retry循环
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //时刻牢记,双与意思是执行右边的前提是左边必须成立!

            /*
             * 判断线程池是否已经关闭
             *
             * 当线程池运行状态为shutdown,stop,tidying或者terminated的时候
             * (运行状态=shutdown,同时第一个任务为空,同时任务队列不为空)不成立的话
             */
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
                //直接返回创建失败
                return false;

            //内层循环
            for (;;) {
                int wc = workerCountOf(c);
                //如果当前线程池中工作线程数 ≥ 临界值了
                // 当前要创建的是核心线程:当前线程池工作线程数 ≥ 核心线程数
                // 当前要创建的不是核心线程:当前线程池工作线程数 ≥ 最大线程数
                if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
                    //直接返回创建失败
                    return false;
                //如果CAS增加工作线程数目失败,即有其他线程已经更改过工作线程数目
                if (compareAndIncrementWorkerCount(c))
                    //退出外层循环
                    break retry;
                //重新获取一下线程运行状态,如果两个不一致就退出外层循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面调用t.start以后开始执行下面,因为

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

所以实际执行的是内部类worker的run()

public void run() {
    runWorker(this);
}

runWorker

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //获取当前工人的第一个任务,如果有的话
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 适当w工人当前持有的锁
        boolean completedAbruptly = true;
        try {
            //循环获取:当第一个任务不为空或者任务队列中获取的任务不为空的时候
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //runStateAtLeast: c1>=c2  || (Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)
                //当前线程没有被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
                    //通知当前线程应当中断
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行这个worker被分配的第一个任务
                        task.run();
                    } catch (RuntimeException | Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    //执行完
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //worker进程退出
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask

//从任务队列中拉取任务
private Runnable getTask() {
        //拉取任务超时标记
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //运行状态>=STOP或者 运行状态=SHUTDOWN+队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            //当前线程池工作线程数量
            int wc = workerCountOf(c);

            //是否允许超时=是否允许核心线程超时回收||当前线程池工作线程数量>核心线程数量
            //如果当前线程池数量小于核心线程数量且不允许核心线程超时则false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //只要(条件1:当前线程池中线程数量>设置的最大线程池容量)或者(条件2:超时设置为真并且之前拉取任务出现超时情况)就可以执行到右边
            //如果是条件2过来的则判断当前池中线程数量是否大于1
            //如果是条件1过来的则判断当前任务队列是否为空
            if (
(wc > maximumPoolSize || (timed && timedOut))
                &&
(wc > 1 || workQueue.isEmpty())
) {
                //扣减当前工人数量
                if (compareAndDecrementWorkerCount(c))
                    //意味着关闭线程
                    return null;
                continue;
            }

            try {
                //任务队列中拉取任务
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    //拉取到,直接返回这个任务
                    return r;
                timedOut = true;    //没拉取到,超时
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

Executors

  1. 生成固定大小线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列

过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

  1. 生成只有一个线程的固定线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }

这个更简单,和上面的一样,只要设置线程数为 1 就可以了:

  1. 生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue

这种线程池对于任务可以比较快速地完成,有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源

ScheduledExecutorService

周期执行任务的线程池
常用方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
创建并执行在给定延迟后启用的单次操作。

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
创建并执行在给定延迟后启用的单次操作。

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
创建并执行在给定的初始延迟之后,以给定的时间间隔执行周期性动作。即在 initialDelay 初始延迟后,initialDelay+period 执行第一次,initialDelay + 2 * period 执行第二次,依次类推。\color{red}{下一次执行任务的时间与任务执行过程花费的时间无关,只与period有关!}并且如果任务的任何一个执行遇到异常,则后续执行都会被取消

public class MyRunable implements Runnable {
    private AtomicInteger atomicInteger;
    private Random random;
 
    public MyRunable() {
        atomicInteger = new AtomicInteger(0);
        random = new Random();
    }

    @Override
    public void run() {
        try {
            String threadName = Thread.currentThread().getName();
            System.out.println("start-任务执行开始:" + new Date() + ":" + threadName);
            /**使用随机延时[0-3]秒来模拟执行任务*/
            int sleepNumber = random.nextInt(5);
            TimeUnit.SECONDS.sleep(sleepNumber);
            System.out.println(atomicInteger.get());
            if (atomicInteger.getAndAdd(1) == 3) {
                int error = 10 / 0;  //模拟抛出异常
            }
            System.out.println("end-任务执行完毕:" + new Date() + ":" + threadName);
        } catch (Exception e) {  //最大范围包裹异常保证后续任务正常执行
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor();
        System.out.println("3秒后开始执行计划线程池服务..." + new Date());
        /**每间隔4秒执行一次任务*/
        scheduledExecutorService.scheduleAtFixedRate(new MyRunable(),
                3, 4, TimeUnit.SECONDS);
    }
}

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
fixedDelay它的间隔时间是根据上次的任务结束的时候开始计时的。比如一个方法上设置了fixedDelay=5*1000,那么当该方法某一次执行结束后,开始计算时间,当时间达到5秒,就开始再次执行该方法

上一篇 下一篇

猜你喜欢

热点阅读