实现guava线程池的优先级队列
定义一个线程类:
线程类需要实现Comparable接口,同时要定义需要比较的字段,
另外需要自己实现一个优先级的线程池如下:
package priority;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
public class PriorityThreadPoolExecutorextends ThreadPoolExecutorimplements ListeningExecutorService {
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected RunnableFuturenewTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask(runnable, value);
}
@Override
protected RunnableFuturenewTaskFor(Callable callable) {
return new ComparableFutureTask(callable);
}
protected class ComparableFutureTask
extends FutureTaskimplements Comparable>,ListenableFuture {
private Objectobject;
// The execution list to hold our listeners.
private final ExecutionListexecutionList =new ExecutionList();
public ComparableFutureTask(Callable callable) {
super(callable);
object = callable;
}
public ComparableFutureTask(Runnable runnable, V result) {
super(runnable, result);
object = runnable;
}
@Override
@SuppressWarnings("unchecked")
public int compareTo(ComparableFutureTask o) {
if (this == o) {
return 0;
}
if (o ==null) {
return -1; // high priority
}
if (object !=null && o.object !=null) {
if (object.getClass().equals(o.object.getClass())) {
if (object instanceof Comparable) {
return ((Comparable)object).compareTo(o.object);
}
}
}
return 0;
}
@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}
/**
* Internal implementation detail used to invoke the listeners.
*/
@Override
protected void done() {
executionList.execute();
}
}
@Override public ListenableFuturesubmit(Runnable task) {
ComparableFutureTask ftask =new ComparableFutureTask(task, null);
execute(ftask);
return ftask;
}
@Override public ListenableFuturesubmit(Runnable task, T result) {
ComparableFutureTask ftask =new ComparableFutureTask(task, result);
execute(ftask);
return ftask;
}
@Override public ListenableFuturesubmit(Callable task) {
ComparableFutureTask ftask =new ComparableFutureTask(task);
execute(ftask);
return ftask;
}
@Override public T invokeAny(Collection> tasks)
throws InterruptedException, ExecutionException {
try {
return invokeAnyImpl(this, tasks, false, 0);
}catch (TimeoutException cannotHappen) {
throw new AssertionError();
}
}
@Override public T invokeAny(
Collection> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout));
}
@Override public List>invokeAll(Collection> tasks)
throws InterruptedException {
if (tasks ==null) {
throw new NullPointerException();
}
List> futures =new ArrayList>(tasks.size());
boolean done =false;
try {
for (Callable t : tasks) {
ComparableFutureTask f =new ComparableFutureTask(t);
futures.add(f);
execute(f);
}
for (Future f : futures) {
if (!f.isDone()) {
try {
f.get();
}catch (CancellationException ignore) {
}catch (ExecutionException ignore) {
}
}
}
done =true;
return futures;
}finally {
if (!done) {
for (Future f : futures) {
f.cancel(true);
}
}
}
}
@Override public List>invokeAll(
Collection> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks ==null || unit ==null) {
throw new NullPointerException();
}
long nanos = unit.toNanos(timeout);
List> futures =new ArrayList>(tasks.size());
boolean done =false;
try {
for (Callable t : tasks) {
futures.add(new ComparableFutureTask(t));
}
long lastTime = System.nanoTime();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
Iterator> it = futures.iterator();
while (it.hasNext()) {
execute((Runnable) (it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <=0) {
return futures;
}
}
for (Future f : futures) {
if (!f.isDone()) {
if (nanos <=0) {
return futures;
}
try {
f.get(nanos, TimeUnit.NANOSECONDS);
}catch (CancellationException ignore) {
}catch (ExecutionException ignore) {
}catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done =true;
return futures;
}finally {
if (!done) {
for (Future f : futures) {
f.cancel(true);
}
}
}
}
/*
* This following method is a modified version of one found in
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
* which contained the following notice:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
* Other contributors include Andrew Wright, Jeffrey Hayes,
* Pat Fisher, Mike Judd.
*/
/**
* An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
* implementations.
*/ static T invokeAnyImpl(ListeningExecutorService executorService,
Collection> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
int ntasks = tasks.size();
checkArgument(ntasks >0);
List> futures = Lists.newArrayListWithCapacity(ntasks);
BlockingQueue> futureQueue = Queues.newLinkedBlockingQueue();
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee =null;
long lastTime = timed ? System.nanoTime() :0;
Iterator> it = tasks.iterator();
futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
--ntasks;
int active =1;
for (;;) {
Future f = futureQueue.poll();
if (f ==null) {
if (ntasks >0) {
--ntasks;
futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
++active;
}else if (active ==0) {
break;
}else if (timed) {
f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
if (f ==null) {
throw new TimeoutException();
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}else {
f = futureQueue.take();
}
}
if (f !=null) {
--active;
try {
return f.get();
}catch (ExecutionException eex) {
ee = eex;
}catch (RuntimeException rex) {
ee =new ExecutionException(rex);
}
}
}
if (ee ==null) {
ee =new ExecutionException(null);
}
throw ee;
}finally {
for (Future f : futures) {
f.cancel(true);
}
}
}
/**
* Submits the task and adds a listener that adds the future to {@code queue} when it completes.
*/
private static ListenableFuturesubmitAndAddQueueListener(
ListeningExecutorService executorService, Callable task,
final BlockingQueue> queue) {
final ListenableFuture future = executorService.submit(task);
future.addListener(new Runnable() {
@Override public void run() {
queue.add(future);
}
}, MoreExecutors.sameThreadExecutor());
return future;
}
}
测试方法如下:
public static void main(String[] args) {
PriorityBlockingQueue priorityQueue =new PriorityBlockingQueue(1000);
ThreadPoolExecutor threadPoolExecutor =new PriorityThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS,priorityQueue);
ListeningExecutorService processExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
for (int i =0; i <15; i++) {
Random random =new Random();
ProcessEntry threadA =new ProcessEntry(random.nextInt(30));
ListenableFuture submit = processExecutorService.submit(threadA);
}
System.out.println("完成");
try {
Thread.sleep(5000);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("插入小的数字");
ProcessEntry threadb =new ProcessEntry(-1);
processExecutorService.submit(threadb);
System.out.println("插入小的数字2");
ProcessEntry threadb1 =new ProcessEntry(-30);
processExecutorService.submit(threadb1);
System.out.println("插入 大的数字");
ProcessEntry threadA =new ProcessEntry(200);
ListenableFuture processFuture = processExecutorService.submit(threadA);
Futures.addCallback(processFuture, new FutureCallback() {
@Override
public void onSuccess(Object o) {
System.out.println("200 成功");
}
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
System.out.println("200 失败");
}
});
}
结果:
完成
priority:29
priority:6
插入小的数字
插入小的数字2
插入 大的数字
priority:23
priority:28
priority:23
priority:200
200 成功
priority:22
priority:22
priority:17
priority:16
priority:16
priority:15
priority:7
priority:3
priority:3
priority:1
priority:-1
priority:-30
可以看到是按照从大到小的顺序执行的线程