jdk 线程池

2019-08-25  本文已影响0人  大白乐了

Executors

public class MyThreadPool {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),new MyThreadFactory("mypool-" ));
//        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }

        System.out.println(service);

        service.shutdown();

        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());

        TimeUnit.SECONDS.sleep(5);
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
    }
}

class MyThreadFactory implements ThreadFactory{

    private String threadName;
    private final AtomicInteger index = new AtomicInteger(1);

    MyThreadFactory(String threadName){
        this.threadName = threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r,threadName + index.getAndAdd(1));
    }
}

WorkStealingPool

public class WorkStealingPoolTest {

    /**
     * 线程数
     */
    private static final int threads = 10;

    /**
     * 用于计算线程是否执行完毕
     */
    CountDownLatch countDownLatch = new CountDownLatch(10);



    /**
     * newFixedThreadPool execute
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void test1() throws InterruptedException {
        System.out.println("---start---");
        ExecutorService executorService = Executors.newWorkStealingPool();
        for (int i = 0; i < threads; i++) {
            executorService.execute(() -> {

                try {
                    System.out.println(Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
            
        }
        countDownLatch.await();
        System.out.println("---end---");
    }

    /**
     * newFixedThreadPool submit Callable
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        System.out.println("--- start ---");
        ExecutorService service = Executors.newWorkStealingPool();
        for (int i = 0; i < threads; i++) {
            FutureTask<?> futureTask = new FutureTask<>(() -> Thread.currentThread().getName());

            service.submit(futureTask);

            System.out.println(futureTask.get());
            
        }
        System.out.println("--- end ---");

        System.out.println(Runtime.getRuntime().availableProcessors());
    }

}

ForkJoinPool

public class ForkJoinTest {

    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        // 第一种
        System.out.println(Arrays.stream(nums).sum());
    }

    static class AddTask extends RecursiveAction {

        int start, end;

        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }



        @Override
        protected void compute() {
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("from:" + start + " to:" + end);
            } else {
                int middle = start + (end-start)/2;

                AddTask subTask1 = new AddTask(start,middle);
                AddTask subTask2 = new AddTask(middle,end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }

    public static void main(String[] args) {
        new AddTask(0,nums.length).compute();

        try {
            // 因为forkjoin 是守护线程,所以需要用阻塞方法,等待forkjoin完成。
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class ForkJoinTest2 {



    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        // 第一种
        System.out.println(Arrays.stream(nums).sum());
    }


    static class AddTask extends RecursiveTask<Long> {

        int start, end;

        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {

            if (end -start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];

                }
                System.out.println("from: " + start + " to:" + end);
                return sum;
            }

            int middle = start + (end - start) / 2;

            AddTask subTask1 = new AddTask(start,middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();
            return subTask1.join() + subTask2.join();
        }
    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0,nums.length);
        fjp.execute(task);

        long result = task.join();
        System.out.println(result);

        System.in.read();
    }
}

上一篇下一篇

猜你喜欢

热点阅读