如何例用Executor创建后台线程池

2020-08-18  本文已影响0人  朱兰婷

Executor

执行已提交的Runnable任务的对象接口。这个接口提供了一种将任务提交和任务运行机制(包括线程使用、调度等的详细信息)分离的方法。

public interface Executor {

    /**
     * 在将来的某些时候执行给定任务。根据Executor的实现判定是在一个新线程还是线程池还是调用线 
     * 程中执行。
     */
    void execute(Runnable command);
}

通常用Executor来代替显示的创建Thread。如要在单独的线程执行某项任务时,使用

    public void test() {
        Executor executor = new ThreadPerTaskExecutor();
        executor.execute(new RunnableTask1());
    }

    class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            new Thread(r).start();
        }
    }

而不是

new Thread(new RunnableTask()).start()

很多Executor的实现都对任务的执行方式和时间强加的限制,如下面定义了一个顺序执行任务的Executor:

class SerialExecutor implements Executor {
        final Queue<Runnable> tasks = new ArrayDeque<>();
        final Executor executor;
        Runnable active;

        SerialExecutor(Executor executor) {
            this.executor = executor;
        }

        public synchronized void execute(final Runnable r) {
            tasks.add(() -> {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            });
            if (active == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((active = tasks.poll()) != null) {
                executor.execute(active);
            }
        }
    }

java.util.concurrent包中提供的Executor的实现ExecutorService是一个更广泛的被使用的接口。ThreadPoolExecutor提供一个可扩展的线程池实现。Executors类为这些Executor提供了方便使用的工厂方法。

下面为java.util.concurrent包中Executor框架的主要类关系图: Executor框架.png

ExecutorService

  1. 一个Executor;
  2. 提供shutdown()方法终止Executor的执行;
  3. 提供submit()方法返回一个Future来追踪异步任务的执行进度;
  4. 提供invokeAll()方法返回一个Future列表来追踪多个异步任务的执行进度。

如果我们现在创建了一个异步执行的网络任务:

    class NetworkService implements Runnable {
        private final ServerSocket serverSocket;
        private final ExecutorService pool;

        public NetworkService(int port, int poolSize)
                throws IOException {
            serverSocket = new ServerSocket(port);
            pool = Executors.newFixedThreadPool(poolSize);
        }

        public void run() { // run the service
            try {
                for (; ; ) {
                    pool.execute(new Handler(serverSocket.accept()));
                }
            } catch (IOException ex) {
                pool.shutdown();
            }
        }
    }

    class Handler implements Runnable {
        private final Socket socket;

        Handler(Socket socket) {
            this.socket = socket;
        }

        public void run() {
            // read and service request on socket
        }
    }

可以调用shutdown()方法终止它的执行:

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
ScheduledExecutorService
  1. 一个ExecutorService;
  2. 其scheduleXXX()方法创建并执行ScheduledFuture;
  3. 该ScheduledFuture可以在给定的延迟后变为启动状态,并开始执行任务;
  4. 该ScheduledFuture可以定期执行任务;
  5. 通过这个ScheduledFuture可以get任务执行结果,可以cancel这个任务。

如需要创建一个在一小时内每隔10s就响一次的报警器:

    class BeeperControl {
        private final ScheduledExecutorService scheduler =
                Executors.newScheduledThreadPool(1);

        public void beepForAnHour() {
            final Runnable beeper = () -> System.out.println("beep");

            final ScheduledFuture<?> beeperHandle =
                    scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);

            scheduler.schedule(() -> {
                beeperHandle.cancel(true);
            }, 60 * 60, SECONDS);
        }
    }
AbstractExecutorService
  1. 一个ExecutorService;
  2. 其submit、invokeAny、invokeAll方法在执行任务前,把任务封装成RunnableFuture,方便跟踪执行进度。
    /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *
     * @param runnable the runnable task being wrapped
     * @param value the default value for the returned future
     * @param <T> the type of the given value
     * @return a {@code RunnableFuture} which, when run, will run the
     * underlying runnable and which, as a {@code Future}, will yield
     * the given value as its result and provide for cancellation of
     * the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

ExecutorService的实现

ThreadPoolExecutor

  1. 一个AbstractExecutorService;
  2. 使用线程池中的某个线程执行提交的任务,该线程池通常由Executors中的工厂方法创建;
  3. FIFO;
  4. 其execute的设计思想为:
if(threadCount < 池中需要在空闲状态也要保持的线程数){
      new thread;
      把当前任务做为新建线程的第一个任务执行;
} else if(任务放入等待队列){
     在队列中等待在运行的线程将它取出并执行;
} else if(threadCount < 池中最多可拥有的线程数){ //此时意味着等待队列已满,放入失败
     new thread;
     把当前任务做为新建线程的第一个任务执行;
} else {
     拒绝该任务的执行,如抛出异常;
}
  1. 线程池中维持corePoolSize数量的线程数,如果有线程在线程池shutdown之前因为执行失败而中断,则在执行后续任务时会新建一个替代它的线程。

    具体原理如下: ThreadPoolExecutor.png
其中定义了线程池的control state(ctl): ThreadPoolExecutor线程池状态.png
各个ctl之间的关系如下: ThreadPoolExecutor runState.png

所以shutdown()代表停止接收新任务但仍需执行现有任务,shutdownNow()代表停止接收新任务也不再执行现有任务。

ThreadPoolExecutor在实现时用到了BlockQueue和AbstracQueuedSynchronizer。

BlockQueue比较常见的实现为LinkedBlockingQueue。其原理如下: LinkedBlockingQueue.png AbstractQueuedSynchronizer.png

可以看到Lock和BlockQueue是非常实用的。
Lock替代了synchronized,功能更强大使用也更简便,如下:

    class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

        final Object[] items = new Object[100];
        int putptr, takeptr, count;

        public void put(Object x) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length)
                    notFull.await();
                items[putptr] = x;
                if (++putptr == items.length) putptr = 0;
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public Object take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0)
                    notEmpty.await();
                Object x = items[takeptr];
                if (++takeptr == items.length) takeptr = 0;
                --count;
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }

BlockingQueue非常适用于生产-消费者模型,如下:

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { queue.put(produce()); }
        } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;

    Consumer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while (true) {
                consume(queue.take());
            }
        } catch (InterruptedException ex) { ...handle ...}
    }

    void consume(Object x) { ...}
}

class BlockingQueueTest {
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        LockTest.Producer p = new LockTest.Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

ScheduledThreadPoolExecutor

  1. 一个可以执行延迟任务和周期性任务的ThreadPoolExecutor;
  2. 如果提交的任务在运行前被取消,则该任务会在延迟结束后被取消;
  3. 如果多个任务有相同的执行时间,采用FIFO;

因为功能不同,其实现和ThreadPoolExecutor也不一样:

  1. maximumPoolSize == corePoolSize;且等待任务队列大小没有限制为Integer.MAX_VALUE;
  2. 设置了shutdown策略来决定哪些任务在线程池进入SHUTDOWN状态后不需要再执行;
  3. 采用DelayedWorkQueue而非LinkedBlockingQueue,加快取消速度(时间复杂度从O(n)降至O(log n));
  4. 在插入队列时将所有任务封装成ScheduledFutureTask给DelayedWorkQueue使用;
具体实现原理如下: ScheduledThreadPoolExecutor.png

ForkJoinPool

  1. 继承AbstractExecutorService;
  2. 将所有提交的或执行的任务封装成ForkJoinTask执行;
  3. 需要将任务分解成足够小的子任务执行,思路如下:
解决(问题):
    if 问题足够小:
        直接解决问题 (顺序算法)
    else:
        for 部份 in 细分(问题)
            fork 子任务来解决(部份)
        join 在前面的循环中生成的所有子任务
        return 合并的结果
fork-join model.png
  1. 每个Worker都有其自己的本地任务队列;
  2. 每个Worker都可以去其它Worker的任务队列中窃取任务执行;


    work stealing实现方式.png
  3. workQueues保存了WorkQueue列表,其中奇数索引代表内部任务(unshared queues,本地任务列表),偶数索引代表外部任务(shared queues,窃取的任务列表),因此workQueues的大小始终为2的幂次方。
ForkJoinPool workQueues.png

其具体实现原理如下:


ForkJoinPool.png

Executors

提供java.util.concurrent包中的Executor、ExecutorService、ScheduledExecutorService、ThreadFactory的工厂方法。

newFixedThreadPool()

创建一个指定corePoolSize和maximumPoolSize都为nThreads的ThreadPoolExecutor。

适用于负载较重的并行运算。

newWorkStealingPool()

创建ForkJoinPool,指定处理还没joined的forked任务的规则为FIFO(上面介绍中的ForkJoinPool默认采用LIFO)。

适用于“分而治之”递归运算计算密集的运算。

newScheduledThreadPool()

创建一个指定corePoolSize的ScheduledThreadPoolExecutor。

适用于需要定期或周期性执行的运算。

newSingleThreadExecutor()

创建一个仅仅只能拥有一个执行线程的ThreadPoolExecutor。

适用于串行运算。

newCachedThreadPool()

创建一个线程数量不限的ThreadPoolExecutor(corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE)。

适用于负载较轻的数量繁多的短期运算。

newSingleThreadScheduledExecutor()

创建一个仅仅只能拥有一个执行线程的ScheduledThreadPoolExecutor。

适用于串行的需要定期或周期性执行的运算。

MoreExecutors

提供java.util.concurrent包中的Executor、ExecutorService、ThreadFactory的工厂方法。

newSequentialExecutor(Executor delegate)

delegate:实际执行任务的底层Executor

创建一个实际由delegate执行任务的SequentialExecutor。

SequentialExecutor:由delegate委拖任务执行程序实际执行任务的采用FIFO规则串行执行任务的Executor。

适用于需要按FIFO规则执行的串行运算。

SequentialExecutor.png

listeningDecorator(ExecutorService delegate)

delegate:实际执行任务的底层Executor

创建一个实际由delegate执行任务的ListeningExecutorService。

ListeningExecutorService:可以在任务完成后再执行给定任务的ExecutorService。

适用于需要将运算链接在一起的场景。

ListeningExecutorService.png

应用

创建一个在非UI线程执行并行任务的Executor

    @Provides
    @Annotations.NonUiParallel
    @Singleton
    static ExecutorService provideNonUiThreadPool() {
        return Executors.newFixedThreadPool(
                5,
                runnable -> {
                    Log.i("DialerExecutorModule.newThread", "creating low priority thread");
                    Thread thread = new Thread(runnable, "DialerExecutors-LowPriority");
                    // Java thread priority 4 corresponds to Process.THREAD_PRIORITY_BACKGROUND (10)
                    thread.setPriority(4);
                    return thread;
                });
    }

创建一个在非UI线程执行串行任务的Executor

    static ScheduledExecutorService provideNonUiSerialExecutorService() {
        return Executors.newSingleThreadScheduledExecutor(
                runnable -> {
                    Log.i("NonUiTaskBuilder.newThread", "creating serial thread");
                    Thread thread = new Thread(runnable, "DialerExecutors-LowPriority-Serial");
                    // Java thread priority 4 corresponds to Process.THREAD_PRIORITY_BACKGROUND (10)
                    thread.setPriority(4);
                    return thread;
                });
    }

创建一个在UI线程执行延迟或周期性任务的Executor

    static ScheduledExecutorService provideUiSerialExecutorService() {
        return Executors.newSingleThreadScheduledExecutor(
                runnable -> {
                    Log.i("DialerExecutorModule.newThread", "creating serial thread");
                    Thread thread = new Thread(runnable, "DialerExecutors-HighPriority-Serial");
                    // Java thread priority 5 corresponds to Process.THREAD_PRIORITY_DEFAULT (0)
                    thread.setPriority(5);
                    return thread;
                });
    }

创建一个在UI线程执行的可以注册listener的轻量级Executor

    @Provides
    @Annotations.UiParallel
    @Singleton
    static ExecutorService provideUiThreadPool() {
        return (ExecutorService) AsyncTask.THREAD_POOL_EXECUTOR;
    }

    static ListeningExecutorService provideLightweightExecutor(@Annotations.UiParallel ExecutorService delegate) {
        return MoreExecutors.listeningDecorator(delegate);
    }

创建一个在非UI线程异步执行运算的可以注册listener的Executore

    static ListeningExecutorService provideBackgroundExecutor(
            @Annotations.NonUiParallel ExecutorService delegate) {
        return MoreExecutors.listeningDecorator(delegate);
    }

总结

Executor框架把任务的提交和执行解耦,可以简化并发编程。线程能过线程的复用,可以降低资源消耗,提高响应速度等。

上一篇下一篇

猜你喜欢

热点阅读