多线程处理列表型数据任务

2024-02-03  本文已影响0人  倚仗听江

封装了个工具类,用于多线程处理列表型数据任务

/**
     * 
     *
     * @param threadWorkSize 每个线程处理的数据条数
     * @param data           数据列表
     * @param function       构建任务的方法
     * @param <T>
     * @return
     */
    public <T> List<Future<Collection<T>>> doTask(Integer threadWorkSize, List<T> data,
                                                  Function<List<T>, Callable<Collection<T>>> function) {
        log.info("开始执行多线程数据处理任务");
        //数据条数
        int dataSize = data.size();
        // 线程数
        int threadNum = dataSize / threadWorkSize + 1;
        log.info("线程数为{},数据条数为{}", threadNum, dataSize);
        // 定义标记,过滤dataSize / threadSize为整数 (当其为整数时最后一个线程无实际作用)
        boolean special = dataSize % threadWorkSize == 0;
        ExecutorService exec = Executors.newFixedThreadPool(threadNum);
        // 任务集合
        List<Callable<Collection<T>>> tasks = new ArrayList<>();
        List<T> cutList = null;
        for (int i = 0; i < threadNum; i++) {
            if (i == threadNum - 1) {
                if (special) {
                    break;
                }
                cutList = data.subList(threadWorkSize * i, dataSize);
            } else {
                cutList = data.subList(threadWorkSize * i, threadWorkSize * (i + 1));
            }
            final List<T> finalList = cutList;
            Callable<Collection<T>> task = function.apply(finalList);
            // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
            tasks.add(task);
        }
        List<Future<Collection<T>>> futures = new ArrayList<>();
        try {
            futures = exec.invokeAll(tasks);
        } catch (InterruptedException e) {
            log.error("线程池处理失败", e);
        }
        exec.shutdown();
        return futures;
    }
上一篇 下一篇

猜你喜欢

热点阅读