多线程处理数据,限制并发数量,等待多线程执行结束反馈结果

2022-12-12  本文已影响0人  承诺一时的华丽
 public void pushDataTask() throws InterruptedException {
        logger.debug("===================>> 启动推送数据任务.. <<===================");
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        // 可实现控制线程同时执行的数量 
        Semaphore semaphore = new Semaphore(2);

        // 消息推送
        LambdaQueryWrapper<Sys_DataPushConfig> pushConfigWrapper = new LambdaQueryWrapper<>();
        pushConfigWrapper.eq(Sys_DataPushConfig::getState, StateType.启用.type);
        List<Sys_DataPushConfig> pushConfigList = pushConfigService.selectList(pushConfigWrapper);
        if (pushConfigList == null || pushConfigList.isEmpty()) {
            // 无推送应用
            logger.debug("===================>> 无推送应用数量。 <<===================");
            return;
        }

        logger.debug("===================>> 推送应用数量:" + pushConfigList.size());

        // 推送数据
        LambdaQueryWrapper<Sys_DataPushRecord> dataPushRecordWrapper = new LambdaQueryWrapper<>();
        dataPushRecordWrapper.eq(Sys_DataPushRecord::getProcessStatus, ProcessStatusType.待处理.type);
        List<Sys_DataPushRecord> dataPushRecords = dataPushRecordService.selectList(dataPushRecordWrapper);

        logger.debug("===================>> 推送的数据量:" + dataPushRecords.size());
        for (Sys_DataPushRecord record : dataPushRecords) {
            // 可等待pushConfigList执行完后,再执行下一条数据,保证对多个应用数据处理结果的一致性
            CountDownLatch latch = new CountDownLatch(pushConfigList.size());
            for (Sys_DataPushConfig config : pushConfigList) {
                logger.debug("===================>> 数据处理:" + dataPushRecords.size());
                Runnable runnable = () -> {
                    try {
                        semaphore.acquire();
                        // 开始处理数据
                        logger.debug(Thread.currentThread().getName() + "------------------" + semaphore.availablePermits() + "\t" + "开始处理数据:config=>" + config.getAppName() + ",record:" + JSONUtil.toJsonStr(record));
                        Thread.sleep(1000);
                        semaphore.release();
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
                executorService.execute(runnable);

            }
            latch.await();
            // 更新该条数据的处理最终结果
            // ...

        }
        executorService.shutdown();
        logger.debug("===================>> 结束推送数据任务。 <<===================");
    }
上一篇 下一篇

猜你喜欢

热点阅读