实现guava线程池的优先级队列

2020-07-29  本文已影响0人  新手党

定义一个线程类:

线程类需要实现Comparable接口,同时要定义需要比较的字段,

另外需要自己实现一个优先级的线程池如下:

package priority;

import com.google.common.collect.Lists;

import com.google.common.collect.Queues;

import com.google.common.util.concurrent.*;

import java.util.ArrayList;

import java.util.Collection;

import java.util.Iterator;

import java.util.List;

import java.util.concurrent.*;

import static com.google.common.base.Preconditions.checkArgument;

public class PriorityThreadPoolExecutorextends ThreadPoolExecutorimplements ListeningExecutorService {

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

                                      long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    }

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

                                      long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

                                      RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

    }

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

                                      long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

                                      ThreadFactory threadFactory) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

                                      long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

                                      ThreadFactory threadFactory, RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

    }

@Override

    protected RunnableFuturenewTaskFor(Runnable runnable, T value) {

return new ComparableFutureTask(runnable, value);

    }

@Override

    protected RunnableFuturenewTaskFor(Callable callable) {

return new ComparableFutureTask(callable);

    }

protected class ComparableFutureTask

extends FutureTaskimplements Comparable>,ListenableFuture  {

private Objectobject;

        // The execution list to hold our listeners.

        private final ExecutionListexecutionList =new ExecutionList();

        public ComparableFutureTask(Callable callable) {

super(callable);

            object = callable;

        }

public ComparableFutureTask(Runnable runnable, V result) {

super(runnable, result);

            object = runnable;

        }

@Override

        @SuppressWarnings("unchecked")

public int compareTo(ComparableFutureTask o) {

if (this == o) {

return 0;

            }

if (o ==null) {

return -1; // high priority

            }

if (object !=null && o.object !=null) {

if (object.getClass().equals(o.object.getClass())) {

if (object instanceof Comparable) {

return ((Comparable)object).compareTo(o.object);

                    }

}

}

return 0;

        }

@Override

        public void addListener(Runnable listener, Executor exec) {

executionList.add(listener, exec);

        }

/**

* Internal implementation detail used to invoke the listeners.

*/

        @Override

        protected void done() {

executionList.execute();

        }

}

@Override public ListenableFuturesubmit(Runnable task) {

ComparableFutureTask ftask =new ComparableFutureTask(task, null);

        execute(ftask);

        return ftask;

    }

@Override public ListenableFuturesubmit(Runnable task, T result) {

ComparableFutureTask ftask =new  ComparableFutureTask(task, result);

        execute(ftask);

        return ftask;

    }

@Override public ListenableFuturesubmit(Callable task) {

ComparableFutureTask ftask =new ComparableFutureTask(task);

        execute(ftask);

        return ftask;

    }

@Override public T invokeAny(Collection> tasks)

throws InterruptedException, ExecutionException {

try {

return invokeAnyImpl(this, tasks, false, 0);

        }catch (TimeoutException cannotHappen) {

throw new AssertionError();

        }

}

@Override public T invokeAny(

Collection> tasks, long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout));

    }

@Override public List>invokeAll(Collection> tasks)

throws InterruptedException {

if (tasks ==null) {

throw new NullPointerException();

        }

List> futures =new ArrayList>(tasks.size());

        boolean done =false;

        try {

for (Callable t : tasks) {

ComparableFutureTask f =new ComparableFutureTask(t);

                futures.add(f);

                execute(f);

            }

for (Future f : futures) {

if (!f.isDone()) {

try {

f.get();

                    }catch (CancellationException ignore) {

}catch (ExecutionException ignore) {

}

}

}

done =true;

            return futures;

        }finally {

if (!done) {

for (Future f : futures) {

f.cancel(true);

                }

}

}

}

@Override public List>invokeAll(

Collection> tasks, long timeout, TimeUnit unit)

throws InterruptedException {

if (tasks ==null || unit ==null) {

throw new NullPointerException();

        }

long nanos = unit.toNanos(timeout);

        List> futures =new ArrayList>(tasks.size());

        boolean done =false;

        try {

for (Callable t : tasks) {

futures.add(new ComparableFutureTask(t));

            }

long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case

// executor doesn't have any/much parallelism.

            Iterator> it = futures.iterator();

            while (it.hasNext()) {

execute((Runnable) (it.next()));

                long now = System.nanoTime();

                nanos -= now - lastTime;

                lastTime = now;

                if (nanos <=0) {

return futures;

                }

}

for (Future f : futures) {

if (!f.isDone()) {

if (nanos <=0) {

return futures;

                    }

try {

f.get(nanos, TimeUnit.NANOSECONDS);

                    }catch (CancellationException ignore) {

}catch (ExecutionException ignore) {

}catch (TimeoutException toe) {

return futures;

                    }

long now = System.nanoTime();

                    nanos -= now - lastTime;

                    lastTime = now;

                }

}

done =true;

            return futures;

        }finally {

if (!done) {

for (Future f : futures) {

f.cancel(true);

                }

}

}

}

/*

* This following method is a modified version of one found in

* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30

* which contained the following notice:

*

* Written by Doug Lea with assistance from members of JCP JSR-166

* Expert Group and released to the public domain, as explained at

* http://creativecommons.org/publicdomain/zero/1.0/

* Other contributors include Andrew Wright, Jeffrey Hayes,

* Pat Fisher, Mike Judd.

*/

    /**

    * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}

* implementations.

    */ static T invokeAnyImpl(ListeningExecutorService executorService,

                                  Collection> tasks, boolean timed, long nanos)

throws InterruptedException, ExecutionException, TimeoutException {

int ntasks = tasks.size();

        checkArgument(ntasks >0);

        List> futures = Lists.newArrayListWithCapacity(ntasks);

        BlockingQueue> futureQueue = Queues.newLinkedBlockingQueue();

        // For efficiency, especially in executors with limited

// parallelism, check to see if previously submitted tasks are

// done before submitting more of them. This interleaving

// plus the exception mechanics account for messiness of main

// loop.

        try {

// Record exceptions so that if we fail to obtain any

// result, we can throw the last exception we got.

            ExecutionException ee =null;

            long lastTime = timed ? System.nanoTime() :0;

            Iterator> it = tasks.iterator();

            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));

            --ntasks;

            int active =1;

            for (;;) {

Future f = futureQueue.poll();

                if (f ==null) {

if (ntasks >0) {

--ntasks;

                        futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));

                        ++active;

                    }else if (active ==0) {

break;

                    }else if (timed) {

f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);

                        if (f ==null) {

throw new TimeoutException();

                        }

long now = System.nanoTime();

                        nanos -= now - lastTime;

                        lastTime = now;

                    }else {

f = futureQueue.take();

                    }

}

if (f !=null) {

--active;

                    try {

return f.get();

                    }catch (ExecutionException eex) {

ee = eex;

                    }catch (RuntimeException rex) {

ee =new ExecutionException(rex);

                    }

}

}

if (ee ==null) {

ee =new ExecutionException(null);

            }

throw ee;

        }finally {

for (Future f : futures) {

f.cancel(true);

            }

}

}

/**

    * Submits the task and adds a listener that adds the future to {@code queue} when it completes.

*/

    private static ListenableFuturesubmitAndAddQueueListener(

ListeningExecutorService executorService, Callable task,

            final BlockingQueue> queue) {

final ListenableFuture future = executorService.submit(task);

        future.addListener(new Runnable() {

@Override public void run() {

queue.add(future);

            }

}, MoreExecutors.sameThreadExecutor());

        return future;

    }

}

测试方法如下:

public static void main(String[] args) {

PriorityBlockingQueue priorityQueue =new PriorityBlockingQueue(1000);

    ThreadPoolExecutor threadPoolExecutor =new PriorityThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS,priorityQueue);

    ListeningExecutorService processExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

    for (int i =0; i <15; i++) {

Random random =new Random();

        ProcessEntry threadA =new ProcessEntry(random.nextInt(30));

        ListenableFuture submit = processExecutorService.submit(threadA);

    }

System.out.println("完成");

    try {

Thread.sleep(5000);

    }catch (InterruptedException e) {

e.printStackTrace();

    }

System.out.println("插入小的数字");

    ProcessEntry threadb =new ProcessEntry(-1);

    processExecutorService.submit(threadb);

    System.out.println("插入小的数字2");

    ProcessEntry threadb1 =new ProcessEntry(-30);

    processExecutorService.submit(threadb1);

    System.out.println("插入 大的数字");

    ProcessEntry threadA =new ProcessEntry(200);

    ListenableFuture processFuture = processExecutorService.submit(threadA);

    Futures.addCallback(processFuture, new FutureCallback() {

@Override

        public void onSuccess(Object o) {

System.out.println("200 成功");

        }

@Override

        public void onFailure(Throwable throwable) {

throwable.printStackTrace();

            System.out.println("200 失败");

        }

});

}

结果:

完成

priority:29

priority:6

插入小的数字

插入小的数字2

插入 大的数字

priority:23

priority:28

priority:23

priority:200

200 成功

priority:22

priority:22

priority:17

priority:16

priority:16

priority:15

priority:7

priority:3

priority:3

priority:1

priority:-1

priority:-30

可以看到是按照从大到小的顺序执行的线程

上一篇 下一篇

猜你喜欢

热点阅读