线程池工具类
2021-04-24 本文已影响0人
眼中有码
public final class ExecutorUtils {
private static final String TAG = "ExecutorUtils";
private Boolean isPaused = false;
private ThreadPoolExecutor executor;
private static final ReentrantLock LOCK = new ReentrantLock();
private Condition pauseCondition;
private static final Handler MAIN_HANDLER = new Handler(Looper.getMainLooper());
private volatile static ExecutorUtils instance;
public static ExecutorUtils getInstance() {
if (instance == null) {
synchronized (ExecutorUtils.class) {
if (instance == null) {
instance = new ExecutorUtils();
}
}
}
return instance;
}
private ExecutorUtils() {
init();
}
private void init() {
pauseCondition = LOCK.newCondition();
int cpuCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCount + 1;
int maxPoolSize = cpuCount * 2 + 1;
PriorityBlockingQueue<Runnable> blockingQueue = new PriorityBlockingQueue<Runnable>();
long keepAliveTime = 30L;
TimeUnit unit = TimeUnit.SECONDS;
AtomicLong seq = new AtomicLong();
ThreadFactory threadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("ExecutorUtils-" + seq.getAndIncrement());
return thread;
};
executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
blockingQueue,
threadFactory
) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (isPaused) {
LOCK.lock();
try {
pauseCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//监控线程池耗时任务,线程创建数量,正在运行的数量
Log.e(TAG, "已执行完的任务的优先级是:" + ((PriorityRunnable) r).mPriority);
}
};
}
public void execute(Runnable runnable) {
this.execute(0, runnable);
}
public void execute(@IntRange(from = 0, to = 10) Integer priority, Runnable runnable) {
executor.execute(new PriorityRunnable(priority, runnable));
}
public <T> void execute(Callable<T> runnable) {
this.execute(0, runnable);
}
public <T> void execute(@IntRange(from = 0, to = 10) Integer priority, Callable<T> runnable) {
executor.execute(new PriorityRunnable(priority, runnable));
}
public abstract static class Callable<T> implements Runnable {
@Override
public void run() {
MAIN_HANDLER.post(this::onPrepare);
T t = onBackground();
//移除所有消息.防止需要执行onCompleted了,onPrepare还没被执行,那就不需要执行了
MAIN_HANDLER.removeCallbacksAndMessages(null);
MAIN_HANDLER.post(() -> onCompleted(t));
}
public void onPrepare() {
}
protected abstract T onBackground();
public abstract void onCompleted(T t);
}
static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private final int mPriority;
private final Runnable mRunnable;
public PriorityRunnable(int priority, Runnable runnable) {
this.mPriority = priority;
this.mRunnable = runnable;
}
@Override
public int compareTo(PriorityRunnable other) {
return Integer.compare(other.mPriority, this.mPriority);
}
@Override
public void run() {
mRunnable.run();
}
}
public void pause() {
LOCK.lock();
try {
if (!isPaused) return;
isPaused = false;
pauseCondition.signalAll();
} finally {
LOCK.unlock();
}
Log.e(TAG, "ExecutorUtils is paused");
}
public void resume() {
LOCK.lock();
try {
if (!isPaused) return;
isPaused = false;
pauseCondition.signalAll();
} finally {
LOCK.unlock();
}
Log.e(TAG, "ExecutorUtils is resumed");
}
}