多线程实现批量更新

2022-06-26  本文已影响0人  c_gentle

相信不少开发者在遇到项目对数据进行批量操作的时候,都会有不少的烦恼,尤其是针对数据量极大的情况下,效率问题就直接提上了菜板。

因此,开多线程来执行批量任务是十分重要的一种批量操作思路,其实这种思路实现起来也十分简单,就拿批量更新的操作举例。
整体流程如下:


image.png

步骤如下:

 public static <T> List<List<T>> split(List<T> resList, int subListLength) {
        if (resList.isEmpty() || subListLength <= 0) {
            return new ArrayList<>();
        }
        List<List<T>> ret = new ArrayList<>();
        int size = resList.size();
        if (size <= subListLength) {
            // 数据量不足 subListLength 指定的大小
            ret.add(resList);
        } else {
            int pre = size / subListLength;
            int last = size % subListLength;
            // 前面pre个集合,每个大小都是 subListLength 个元素
            for (int i = 0; i < pre; i++) {
                List<T> itemList = new ArrayList<>();
                for (int j = 0; j < subListLength; j++) {
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            //last的处理
            if (last > 0) {
                List<T> itemList = new ArrayList<>();
                for (int i = 0; i < last; i++) {
                    itemList.add(resList.get(pre * subListLength + i));
                }
                ret.add(itemList);
            }

        }
        return ret;
    }

开启异步执行任务的线程池:


    //开启多线程
    public void threadMethod() {
        List<T> updateList = new ArrayList();
        //初始化线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
        // 大集合拆分成N个小集合, 这里集合的size可以稍微小一些(这里我用100刚刚好), 以保证多线程异步执行, 过大容易回到单线程
        List<T> splitNList = SplitListUtils.split(totalList, 100);
        // 记录单个任务的执行次数
        CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
        // 对拆分的集合进行批量处理, 先拆分的集合, 再多线程执行
        splitNList.stream().forEach(o -> {
            //线程池执行
            threadPool.execute(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (Entity yangshiwen : singleList) {
                        // 将每一个对象进行数据封装, 并添加到一个用于存储更新数据的list
                        // ......
                    }
                }
            }));
            // 任务个数 - 1, 直至为0时唤醒await()
            countDownLatch.countDown();
        });
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 通过mybatis的批量插入的方式来进行数据的插入, 这一步还是要做判空
        if (!updateList.isEmpty()) {
            batchUpdateEntity(updateList);
        }

    }
上一篇下一篇

猜你喜欢

热点阅读