10-Executor框架

2019-03-21  本文已影响0人  加夕

Java的线程既是工作单元,也是执行机制。JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

1.Executor框架简介

①Executor框架的两级调度模型

java.lang.Thread被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

②Executor框架的结构与成员

1)Executor框架的结构
2)Executor框架的成员

newSingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

newCachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

2.ScheduleThreadPoolExecutor详解

继承自ThreadPoolExecutor。主要用来在给定的延迟后执行运行任务,或者定期执行任务。功能与Timer类似,但功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduleThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

①ScheduleThreadPoolExecutor的运行机制

ScheduleThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下的修改。

②ScheduleThreadPoolExecutor的实现

ScheduledFutureTask主要包含3个成员变量:

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            //获取锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //获取周期任务
                    RunnableScheduledFuture<?> first = queue[0];
                    //任务数组为空
                    if (first == null)
                        available.await();//等待
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0) 
                            return finishPoll(first);//获取头部元素,将最后一个元素放到第一位,并自上而下添加到其堆排序点。
                        //头部元素的time时间比当前时间大
                        first = null; // don't retain ref while waiting
                        if (leader != null) 
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread; 
                            try {
                                //到condition中等待到time时间
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();//释放锁
            }
        }
        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();//1 获取锁
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//调整堆数组大小
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);//2.1 在底部添加到堆排序点
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();//2.2 通知
                }
            } finally {
                lock.unlock();//3 释放锁
            }
            return true;
        }

3.FutureTask详解

①FutureTask简介

实现了Future接口和Runnable接口,可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

FutureTask.get():未启动和已启动状态时调用,会导致线程阻塞。已完成状态时,调用线程立即返回结果或抛出异常。

FutureTask.cancel():未启动状态时,调用将导致此任务永远不会被执行。已启动状态时,cancel(true)将以中断执行此任务线程的方式来视图停止任务;cancel(false)将不会对正在执行此任务的线程产生任何影响(让正在执行的任务运行完成)。已完成状态时,执行cancel返回false。

②FutureTask的使用

当一个线程需要等待另一个线程把某个任务执行完成后才能继续执行,此时可以使用FutureTask。

假设有多个线程执行若干个任务,每个任务最多只能被执行一次,当多个线程视图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完成后才能继续执行。示例代码:

    private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();//每个任务只执行一次,多个线程可以获取到执行的结果
    private String executionTask(final String taskName) throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName);    //1.1   执行完1.3后执行  2.1
            if (future == null) {
                FutureTask<String> futureTask = new FutureTask<>(() -> taskName);//1.2
                //如果存在taskName,不更改value,返回旧值,如果不存在,put
                future = taskCache.putIfAbsent(taskName, futureTask);//1.3
                if (future == null) {//put成功了
                    future = futureTask;
                    futureTask.run();//1.4执行任务
                }
            }
            try {
                return future.get();//1.5 2.2
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);//当taskName和future有映射关系的时候,才移除
            }
        }
    }

当两个线程试图同时执行同一个任务时,如果Thread1执行了1.3后Thread2执行2.1,那么接下来Thread2将在2.2等待,知道Thread1执行完成1.4后Thread2才能从2.2返回。

上一篇 下一篇

猜你喜欢

热点阅读