多线程超时处理

2018-01-16  本文已影响0人  zcoljefe

背景:多个线程同时执行某一类任务,在规定时间内未完成,则终止未完成的线程继续执行,防止系统资源的浪费和用户的长时间等待,并获取已完成线程的返回结果。
方案:使用计数器 CountDownLatch 的 await(long timeout, TimeUnit unit) 方法。

public class Main {
    /* 启动线程数 */
    private static final int THREAD_COUNT = 10; 

    public static void main(String[] args) {
        /* 线程池 */
        ExecutorService ex = Executors.newFixedThreadPool(THREAD_COUNT);
        /* 计数器 */
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
        List<Future<String>> tasks = new ArrayList<>();
        /* 启动 THREAD_COUNT 子线程 */
        IntStream.range(0, THREAD_COUNT).forEach(i -> tasks.add(ex.submit(() -> planThread(i, latch))));
        /* 启动监听线程 */
        new Thread(() -> timoutListener(latch, 5000, tasks)).start();
        //主线程处理其它任务......
        /* 获取多线程处理结果 */
        for (int i = 0; i < tasks.size(); i++) {
            Future<String> f = tasks.get(i);
            try {
                System.out.println("Main线程:Thread 【" + i + "】返回:" + f.get());
            }  catch (Exception e) {
                System.out.println("Main线程:Thread 【" + i + "】被取消,无返回结果");
            }
        }
        ex.shutdown();
    }

    /* 任务处理线程 */
    private static String planThread(int i, CountDownLatch latch) {
        try {
            System.out.println("===线程" + i + "开始执行");
            String s = "";
            int m = 0;

            while(m < Integer.MAX_VALUE){
                if(Thread.currentThread().isInterrupted()){
                    System.out.println("线程被中断");
                    return null;
                }
                if(m%10000 == 0){
                    System.out.println(m);
                }
                s += m;
                m ++;
            }
            latch.countDown();
            System.out.println("线程" + i + "执行时完成===");
            return "" + i;
        } catch(CancellationException e){
            e.printStackTrace();
        }
        return null;
    }

    /* 超时监听线程 */
    public static void timoutListener(CountDownLatch latch, long timeOutMillion, List<Future<String>> tasks){
        try {
            /* 如果计数器为0(所有任务完成)或超过时间,则向下执行 */
            latch.await(timeOutMillion, TimeUnit.MILLISECONDS);
            for (int i = 0; i < tasks.size(); i++) {
                Future<String> f = tasks.get(i);
                if (!f.isDone()) {
                    System.out.println("监听线程:终止Thread 【" + i + "】");
                    f.cancel(true);
                } else {
                    System.out.println("监听线程:Thread 【" + i + "】已正常结束");
                }
            }
            System.out.println("监听线程结束!");
        } catch (InterruptedException e) {
            System.out.println("监听线程抛出中断异常");
        }
    }
}

另外:
Future 的 get(long timeout, TimeUnit unit) 方法:是在调用 get 方法时开始记时,等待指定时间,并非是从任务开始计时。并且等待第一个线程超时后,第二线程个调用 get 重新计时,无法保证所有线程在指定时间内完成。
CyclicBarrier 的 await(long timeout, TimeUnit unit) 用于一个子线程等待其它子线程的时间,主要用于线程间同步。

上一篇下一篇

猜你喜欢

热点阅读