一个多线程

2022-01-25  本文已影响0人  你家门口的两朵云

依赖

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>

写一个工具类

package edu.hgnu.utils;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.*;

/**
 * 任务数量不要大于2048
 */
public class Paraller {

    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("hbd-core-pool-%d").build();

    private static Semaphore globalSemaphore = new Semaphore(2048);

    //common Thread Pool
    private static volatile ExecutorService pool = null;

    static {
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        //使用SynchronousQueue 每次加入任务后立即从线程池中取一个线程或新增一个线程 , 线程过期策略15秒
        pool = new ThreadPoolExecutor(corePoolSize, 1024 * 20,
                5L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    }

    /**
     * 大部分情况下不需要调用
     */
    public static void shutDown() {
        if (pool != null) {
            pool.shutdown();
        }
    }

    /**
     * 提交一个任务到线程池中,通过该方法提交的并发任务数量超过1024*2则该方法会阻塞
     */
    public static void execute(ParallerAction action) {
        try {
            globalSemaphore.acquire();
            pool.execute(() -> {
                try {
                    action.action();
                } catch (Throwable ex) {
                    ex.printStackTrace();
                } finally {
                    globalSemaphore.release();
                }
            });
        } catch (Throwable ex) {
            ex.printStackTrace();
        }

    }

    public static void forEach(final List<ParallerAction> actions) {
        forEach(actions, null);
    }

    public static void forEach(final List<ParallerAction> actions, ParallelOptions parallelOptions) {

        if (actions == null || actions.size() < 1) {
            return;
        }

        int corePoolSize = Runtime.getRuntime().availableProcessors();
        int threadCount = 0;
        if (parallelOptions == null || parallelOptions.threadCount <= 0) {
            threadCount = corePoolSize;
        } else {
            threadCount = parallelOptions.threadCount;
        }

        if (threadCount > 1024) {
            threadCount = 1024;
        }

        final CountDownLatch latch = new CountDownLatch(actions.size());
        //限制入池流量
        final Semaphore semaphore = new Semaphore(threadCount);
        for (final ParallerAction item : actions) {
            try {
                //获取到线程许可才可以执行,否则堵塞,执行一次释放一个栅格
                semaphore.acquire();
                pool.execute(() -> {
                    try {
                        item.action();
                    } catch (Throwable ex) {
                        ex.printStackTrace();
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                });
            } catch (Throwable ex) {
                ex.printStackTrace();
            }
        }

        try {
            latch.await();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public static class ParallelOptions {
        public ParallelOptions() {

        }

        public ParallelOptions(int threadCount) {
            this.threadCount = threadCount;
        }

        public int getThreadCount() {
            return threadCount;
        }

        public void setThreadCount(int threadCount) {
            this.threadCount = threadCount;
        }

        /**
         * 并发的线程数量
         */
        private int threadCount;

    }

    public interface ParallerAction {
        void action();
    }
}

使用多线程执行任务

public void mulThreadRun(){

        ArrayList<Emp> list = new ArrayList<>();
        list.add(new Emp().setEmpId("444").setName("李六"));
        list.add(new Emp().setEmpId("111").setName("张三"));
        list.add(new Emp().setEmpId("222").setName("Mr z"));
        list.add(new Emp().setEmpId("111").setName("张四"));
        list.add(new Emp().setEmpId("222").setName("Mr c"));
        list.add(new Emp().setEmpId("333").setName("Korean"));
        list.add(new Emp().setEmpId("111").setName("张五"));
        list.add(new Emp().setEmpId("333").setName("3060 TI"));
        list.add(new Emp().setEmpId("111").setName("张六"));
        list.add(new Emp().setEmpId("333").setName("2060 TI"));
        list.add(new Emp().setEmpId("444").setName("李四"));
        list.add(new Emp().setEmpId("333").setName("1080 TI"));
        list.add(new Emp().setEmpId("444").setName("李五"));

        System.out.println(list);

        Map<String, List<Emp>> listMap = list.parallelStream().collect(Collectors.groupingBy(Emp::getEmpId));
        System.out.println(listMap);

        // 分组执行
        List<Paraller.ParallerAction> actions = new ArrayList<>(listMap.size());
        listMap.forEach((key, value) -> {
            actions.add(new Action(key, value));
        });
        // 遍历调用执行
        Paraller.forEach(actions, null);
    }

    /**
     * 每个线程的具体执行
     */
    private class Action implements Paraller.ParallerAction {

        String keyF;
        List<Emp> listF;
        // 构造方法,将需要的数据传进来;
        public Action(String key,List<Emp> list) {
            this.keyF = key;
            this.listF = list;
        }

        // 实际执行动作
        @Override
        public void action() {
            System.out.println(keyF+"-------"+listF.toString());
            Paraller.shutDown();
        }
    }

测试

public class TestMulThread {
    public static void main(String[] args) {

        MulThread mulThread = new MulThread();
        mulThread.mulThreadRun();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读