PMQ源码阅读(3)---线程批次概念

2021-10-29  本文已影响0人  _Kantin

背景

概念

代码流程

    //线程批次入口
    private void doHandleData(ConsumerQueueDto pre, int msgSize) {
        //消费线程默认大小 - 核心线程活跃个数 = 核心线程空闲格式
        int threadSize = pre.getThreadSize() + 2 - executor.getActiveCount();
        int startThread = (int) ((msgSize + pre.getConsumerBatchSize() - 1) / pre.getConsumerBatchSize());
        //限制下每个批次最大数量也就是pre的默认值10
        if (startThread >= threadSize) {
            startThread = threadSize;
        }
        if (startThread > pre.getThreadSize()) {
            startThread = pre.getThreadSize();
        }
        //获取起进程批次号,转【2】
        long batchRecorderId = batchRecorder.begin(startThread);
        CountDownLatch countDownLatch = new CountDownLatch(startThread);
        //批量的进行消息消费,转【3】
        batchExcute(pre, startThread, batchRecorderId, countDownLatch);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {

        }
    }
    //在内存中缓存进程批次和详情
    public class BatchRecorder {
        //线程安全的hashMap
        public Map<Long, MqQueueExcutorService.BatchRecorderItem> recordMap = new ConcurrentHashMap<Long, MqQueueExcutorService.BatchRecorderItem>();
        // 记录最小的线程批次编号
        private volatile long start = 0L;
        // 记录当前开启的线程批次编号
        private volatile long current = 0L;
        private Object lockObject = new Object();
        
        //线程批次current自增
        public long begin(int threadCount) {
            current++;
            MqQueueExcutorService.BatchRecorderItem batchRecorderItem = new MqQueueExcutorService.BatchRecorderItem();
            batchRecorderItem.batchReacorderId = current;
            batchRecorderItem.threadCount = threadCount;
            recordMap.put(current, batchRecorderItem);
            //返回本次线程批次号
            return current;
        }
    }
    public class MsgThread implements Runnable {
        ......
        /***
         * @author: kantlin
         * @date: 2021/10/28 13:47
         * @description:【消费者message流程】    执行消费线程中的run方法
         */
        @Override
        public void run() {
            ......
            MqQueueExcutorService.BatchRecorderItem batchRecorderItem = null;
            long maxId = 0;
            try {
                if (isRunning && checkOffsetVersion(pre)) {
                    //从message队列中拉取消息进行消费,并记录下最大消息的maxId,转【4】
                    maxId = threadExcute(pre);
                    //更新pre中maxId的值
                    updateOffset(pre, maxId);
                }
            } catch (Exception e) {
            }
            //根据batchRecorderItem的值是否为null在判断可否提交,转【6】
            batchRecorderItem = batchRecorder.end(batchRecorderId, maxId);
            if (batchRecorderItem != null && iAsynSubscriber == null) {
                //若batchRecorderItem 不为空则执行提交逻辑,转【8】
                doCommit(pre, batchRecorderItem);
            }
            ......
        }
    }
    protected long threadExcute(ConsumerQueueDto pre) {
        if (isRunning && (iSubscriber != null || iAsynSubscriber != null)) {
            ......
            //拉取待消费的消息并获取最大的Id,转【5】
            MqQueueExcutorService.Pair<Long, Boolean> pair = prepareValue(pre, messageMap);
            long maxId = pair.item1;
            ......
            //从
            if (messageMap.size() > 0) {
                ......
                //消费消息
                List<Long> failIds = invokeMessage(pre, messageMap);
                ......
                return maxId;
            } else {
                ......
            }
        }
        return 0;
    }
    protected MqQueueExcutorService.Pair<Long, Boolean> prepareValue(ConsumerQueueDto pre, Map<Long, MessageDto> messageMap) {
        ......
        while (count < pre.getConsumerBatchSize()) {
            //从message队列拉取消息,数量为批次大小
            MessageDto messageDto = messages.poll();
            if (isRunning && messageDto != null && checkOffsetVersion(pre)) {
                if (onMsgFilter(messageDto)) {
                    if (checkTag(pre, messageDto)) {
                        if (checkDelay(messageDto, pre))
                            if (checkRetryCount(messageDto, pre)) {
                                ......
                            }
                    }
                }
                //前后对比获取拉取消息的最大ID
                maxId = maxId < messageDto.getId() ? messageDto.getId() : maxId;
                // flag = true;
                pair.item1 = maxId;
                pair.item2 = true;
            }
            count++;
        }
        return pair;
    }
    public MqQueueExcutorService.BatchRecorderItem end(long batchReacorderId, long maxId) {
        MqQueueExcutorService.BatchRecorderItem finishedItem = recordMap.get(batchReacorderId);
        if (finishedItem == null) {
            return null;
        }
        //本批次完成数+1
        int count = finishedItem.counter.incrementAndGet();
        //排它锁更新操作
        synchronized (lockObject) {
            //更新本次批次消费信息的最大maxId
            if (finishedItem.maxId < maxId) {
                finishedItem.maxId = maxId;
            }
            //判断是否本次已执行完成
            if (!finishedItem.batchFinished) {
                finishedItem.batchFinished = count == finishedItem.threadCount;
            }
        }
        //若已完成还得看是否为可提交批次,转【7】
        if (finishedItem.batchFinished) {
            MqQueueExcutorService.BatchRecorderItem rs = getLastestItem();
            return rs;
        }

        return null;
    }
    public MqQueueExcutorService.BatchRecorderItem getLastestItem() {
        MqQueueExcutorService.BatchRecorderItem finishedItem = null;
        boolean rs = false;
        //若首次提交start从0开始,后续start值等于最小提交批次
        for (long i = start + 1; i <= current; i++) {
            finishedItem = recordMap.get(i);
            if (finishedItem == null) {
                continue;
            }
            //若当前i !=current,且i批次未完成,则current还不可提交
            if (!finishedItem.batchFinished) {
                break;
            } else {
                rs = true;
            }
        }
        //current为最新可提交批次了,则返回为not null
        if (!rs) {
            finishedItem = null;
        }

        return finishedItem;
    }
    private void doCommit(ConsumerQueueDto temp, MqQueueExcutorService.BatchRecorderItem batchRecorderItem) {
        if (batchRecorderItem == null)
            return;
        CommitOffsetRequest request = new CommitOffsetRequest();
        if (checkOffsetVersion(temp)) {
            List<ConsumerQueueVersionDto> queueVersionDtos = new ArrayList<>();
            request.setQueueOffsets(queueVersionDtos);
            ConsumerQueueVersionDto consumerQueueVersionDto = new ConsumerQueueVersionDto();
            ......
            queueVersionDtos.add(consumerQueueVersionDto);
            //调用broke的/commitOffset接口
            mqResource.commitOffset(request);
        }
        batchRecorder.delete(batchRecorderItem.batchReacorderId);
    }

    //修改start的开始值并remove map中缓存
    public void delete(long batchReacorderId) {
        long temp = start;
        start = batchReacorderId;
        for (long i = temp + 1; i <= batchReacorderId; i++) {
            recordMap.remove(i);
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读