线程池工具类

2021-04-24  本文已影响0人  眼中有码
public final class ExecutorUtils {
    private static final String TAG = "ExecutorUtils";

    private Boolean isPaused = false;
    private ThreadPoolExecutor executor;
    private static final ReentrantLock LOCK = new ReentrantLock();
    private Condition pauseCondition;
    private static final Handler MAIN_HANDLER = new Handler(Looper.getMainLooper());

    private volatile static ExecutorUtils instance;

    public static ExecutorUtils getInstance() {
        if (instance == null) {
            synchronized (ExecutorUtils.class) {
                if (instance == null) {
                    instance = new ExecutorUtils();
                }
            }
        }
        return instance;
    }

    private ExecutorUtils() {
        init();
    }

    private void init() {
        pauseCondition = LOCK.newCondition();
        int cpuCount = Runtime.getRuntime().availableProcessors();
        int corePoolSize = cpuCount + 1;
        int maxPoolSize = cpuCount * 2 + 1;
        PriorityBlockingQueue<Runnable> blockingQueue = new PriorityBlockingQueue<Runnable>();
        long keepAliveTime = 30L;
        TimeUnit unit = TimeUnit.SECONDS;
        AtomicLong seq = new AtomicLong();
        ThreadFactory threadFactory = r -> {
            Thread thread = new Thread(r);
            thread.setName("ExecutorUtils-" + seq.getAndIncrement());
            return thread;
        };

        executor = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                unit,
                blockingQueue,
                threadFactory
        ) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                super.beforeExecute(t, r);
                if (isPaused) {
                    LOCK.lock();
                    try {
                        pauseCondition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        LOCK.unlock();
                    }
                }
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                //监控线程池耗时任务,线程创建数量,正在运行的数量
                Log.e(TAG, "已执行完的任务的优先级是:" + ((PriorityRunnable) r).mPriority);
            }
        };
    }

    public void execute(Runnable runnable) {
        this.execute(0, runnable);
    }

    public void execute(@IntRange(from = 0, to = 10) Integer priority, Runnable runnable) {
        executor.execute(new PriorityRunnable(priority, runnable));
    }

    public <T> void execute(Callable<T> runnable) {
        this.execute(0, runnable);
    }

    public <T> void execute(@IntRange(from = 0, to = 10) Integer priority, Callable<T> runnable) {
        executor.execute(new PriorityRunnable(priority, runnable));
    }


    public abstract static class Callable<T> implements Runnable {
        @Override
        public void run() {
            MAIN_HANDLER.post(this::onPrepare);
            T t = onBackground();
            //移除所有消息.防止需要执行onCompleted了,onPrepare还没被执行,那就不需要执行了
            MAIN_HANDLER.removeCallbacksAndMessages(null);
            MAIN_HANDLER.post(() -> onCompleted(t));
        }

        public void onPrepare() {
        }

        protected abstract T onBackground();

        public abstract void onCompleted(T t);
    }


    static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
        private final int mPriority;
        private final Runnable mRunnable;

        public PriorityRunnable(int priority, Runnable runnable) {
            this.mPriority = priority;
            this.mRunnable = runnable;
       }

        @Override
        public int compareTo(PriorityRunnable other) {
            return Integer.compare(other.mPriority, this.mPriority);
        }

        @Override
        public void run() {
            mRunnable.run();
        }
    }

    public void pause() {
        LOCK.lock();
        try {
            if (!isPaused) return;
            isPaused = false;
            pauseCondition.signalAll();
        } finally {
            LOCK.unlock();
        }

        Log.e(TAG, "ExecutorUtils is paused");
    }

    public void resume() {
        LOCK.lock();
        try {
            if (!isPaused) return;
            isPaused = false;
            pauseCondition.signalAll();
        } finally {
            LOCK.unlock();
        }
        Log.e(TAG, "ExecutorUtils is resumed");
    }
}

上一篇下一篇

猜你喜欢

热点阅读