《JAVA并发编程实战》示例程序

线程安全10 - 高并发请求 缩减selectOne --> s

2020-01-07  本文已影响0人  小超_8b2f

1. 根据code、clinicId 两个参数查询版本

\color{red}{(注意:没有查询到数据时会阻塞,待修正)}

@Override
    public Param selectParamNew(Param param) {
        String id = SessionUtil.getClinicId() + "|" + param.getCode();
        Request request = new Request(UUID.randomUUID().toString(),id,SessionUtil.getClinicId(),param.getCode(),new CompletableFuture<Param>());
        queue.add(request);
        Param result = null;
        try{
            result = request.future.get();
        } catch ( InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return result;
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private class Request {
        String serialNo;
        String id;
        Integer clinicId;
        String code;
        CompletableFuture<Param> future;
    }

    LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();



    @PostConstruct
    public void init() {
        ThreadPoolUtils t;
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
        User user = SessionUtil.getCurrentUser();

        //每隔10毫秒根据队列里堆积的查询id执行一次批量查询
        pool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                SessionUtil.setContext(user,"hello world");
                try {
                    List<Request> requests = new ArrayList<>();
                    Map<String, Request> serialNoRequestMap = new HashMap<>();
                    Set<String> ids = new HashSet<>();
                    if (queue.isEmpty()) {
                        return;
                    }
                    while (!queue.isEmpty()) {
                        Request request = queue.pop();
                        requests.add(request);
                        serialNoRequestMap.put(request.getSerialNo(), request);
                        ids.add(request.getId());
                    }
                    Map<String, Param> serialNoUserMap = selectBatch(requests);

                    for (Request request : requests) {
                        Param param = serialNoUserMap.get(request.getSerialNo());
                        request.getFuture().complete(param);
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    SessionUtil.removeContext();
                }
                System.out.println("-----PostConstruct.run() end");
            }
        },0,10, TimeUnit.MILLISECONDS);
    }


    /**
     * 根据序列号查询
     * @param reqeusts
     * @return
     */
    private Map<String,Param> selectBatch(List<Request> reqeusts) {
        Map<String,Param> serialNo_param_map = new HashMap<>();
        Map<String,List<String>> idSerialNosMap = new HashMap<>();
        Map<Integer,Set<String>> clinicIdCodesMap = new HashMap<>(); //<clinicId,[code1,code2]>
        List<ParamBatchQuery> queryParamList = new ArrayList<>();

        for (Request request : reqeusts) {
            if(!clinicIdCodesMap.containsKey(request.getClinicId())) {
                Set<String> codeSet = new HashSet<>();
                codeSet.add(request.getCode());
                clinicIdCodesMap.put(request.getClinicId(),codeSet);
            } else {
                clinicIdCodesMap.get(request.getClinicId()).add(request.getCode());
            }

            if(!idSerialNosMap.containsKey(request.getId())) {
                List<String> serialNoList = new ArrayList<>();
                serialNoList.add(request.getSerialNo());
                idSerialNosMap.put(request.getId(),serialNoList);
            } else {
                idSerialNosMap.get(request.getId()).add(request.getSerialNo());
            }
        }

        List<ParamBatchQuery> listQueryParam = new ArrayList<>();
        for (Map.Entry<Integer, Set<String>> entry : clinicIdCodesMap.entrySet())
            listQueryParam.add(new ParamBatchQuery(entry.getKey(),entry.getValue()));

        List<Param> params = paramMapper.selectParamBatch(listQueryParam);

        System.out.println("----本次查询共获取:" + params.size() + "个结果");
        for(Param param : params) {
            List<String> serialNoList = idSerialNosMap.get(param.getClinicId()+"|"+param.getCode());
            for(String serialNo : serialNoList) {
                serialNo_param_map.put(serialNo,param);
            }
        }
        return serialNo_param_map;
    }

2. 一个参数版本:

    @Autowired
    UserMapper userMapper;

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Request {
        String serialNo;
        Integer id;
        CompletableFuture<User> future;
    }

    LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();

    @PostConstruct
    public void init() {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
        //每隔10毫秒根据队列里堆积的查询id执行一次批量查询
        pool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                List<Request> requests = new ArrayList<>();
                Map<String,Request> serialNoRequestMap = new HashMap<>();
                List<Integer> ids = new ArrayList<>();
                if(queue.isEmpty()) {
                    return;
                }
                while(!queue.isEmpty()) {
                    Request request = queue.pop();
                    requests.add(request);
                    serialNoRequestMap.put(request.getSerialNo(),request);
                    ids.add(request.getId());
                }

                System.out.println("-----ids : " + ids);
                Map<String, User> serialNoUserMap = selectBatch(requests);
//                List<User> usersResult = userMapper.selectList(ids);

                for(Request request : requests) {
                    User user = serialNoUserMap.get(request.getSerialNo());
                    request.getFuture().complete(user);
                }
                System.out.println("-----PostConstruct.run() end");
            }
        },0,10, TimeUnit.MILLISECONDS);
    }


    /**
     * 根据序列号查询
     * @param reqeusts
     * @return
     */
    public Map<String,User> selectBatch(List<Request> reqeusts) {
        Map<String,User> serialNo_user_map = new HashMap<>();
        List<Integer> ids = new ArrayList<>();
        Map<Integer,List<String>> idSerialNosMap = new HashMap<>();
        for (Request request : reqeusts) {
            request.getSerialNo();
            ids.add(request.getId());
            if(!idSerialNosMap.containsKey(request.getId())) {
                List<String> serialNoList = new ArrayList<>();
                serialNoList.add(request.getSerialNo());
                idSerialNosMap.put(request.getId(),serialNoList);
            } else {
                idSerialNosMap.get(request.getId()).add(request.getSerialNo());
            }
        }

        List<User> users = userMapper.selectList(ids);
        System.out.println("----本次查询共获取:" + users.size() + "个结果");
        for(User user : users) {
            List<String> serialNoList = idSerialNosMap.get(user.getId());
            for(String serialNo : serialNoList) {
                serialNo_user_map.put(serialNo,user);
            }
        }
        return serialNo_user_map;
    }

    @Override
    public User selectByPrimaryKey(Integer id){
        Request request = new Request(UUID.randomUUID().toString(),id,new CompletableFuture<User>());
        queue.add(request);
        User user = null;
        try{
            user = request.future.get();
        } catch ( InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return user;
//        return userMapper.selectByPrimaryKey(id);
    }
上一篇下一篇

猜你喜欢

热点阅读