PMQ源码阅读(2)---消息产生和消费过程

2021-10-28  本文已影响0人  _Kantin

背景

消息产生和消费架构总览

PMQ架构图
PMQ消息端到端流程

Client端上报消息的过程

简单流程
客户端消息发送流程图
client代码流程
//topicName=消息topic的名字,ProducerDataDto=消息体中的内容
MqClient.publish(topicName, "",new ProducerDataDto("kantlin"+String.valueOf(i)));
    public boolean publish(PublishMessageRequest request, int retryTimes) {
        ......
        //这里集成了cat监控,对每个发送的消息进行链路追踪【排查问题的关键】
        Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
        Timer.Context timer1 = MetricSingleton.getMetricRegistry()
                .timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
        try {
            //上报到PMD broke的接口地址
            String url = MqConstanst.CONSUMERPRE + "/publish";
            //调用post请求将消息发送到broker中【转(3)】
            PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
            ......
            transaction.setStatus(Transaction.SUCCESS);
            //如果可处理的发送未成功,则上报到cat中
            if (!response.isSuc()) {
                ......
                addCat(request2);
            }
            //返回给客户端本地上报结果
            return response.isSuc();
        } catch (Exception e) {
            //如果是未处理异常的话,除了会上报到cat还会发送对应的邮件告警
            ......
            addCat(request2);
            ......
            sendMail(mailRequest);
            return false;
        } finally {
            transaction.complete();
            timer1.stop();
        }
    }
    public String post(String url, Object reqObj) throws IOException, BrokerException {
        ......
        Response response = null;
        //基于cat进行链路追踪
        Transaction transaction = Tracer.newTransaction("mq-http", url);
        try {
            ......
            //获取http客户端并发送,有异常的话记录到cat中并向上抛,不做处理
            Request request=requestbuilder.build();
            response = client.newCall(request).execute();
            transaction.setStatus(Transaction.SUCCESS);
            ......
        }
            ......
        finally {
            transaction.complete();
            ......
        }
    }

Rest端接受消费者消息

简单流程
代码流程
    @PostMapping("/publish")
    public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
        setSubEnv(request);
        //转【2】
        PublishMessageResponse response = consumerService.publish(request);
        return response;
    }
      public PublishMessageResponse publish(PublishMessageRequest request) {
        ......
        try {
            Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
            Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
            if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
                List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
                if (queueEntities == null || queueEntities.size() == 0) {
                    ......
                    //如果可写队列中没有包含的话,但是总的topic中有,则可能表示有偏差,因此需要重新更新缓存数据
                    if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
                        queueEntities = topicQueueMap.get(request.getTopicName());
                        updateQueueCache(request.getTopicName());
                    } else {
                        updateQueueCache(request.getTopicName());
                        return response;
                    }
                }
                //如果可写队列包含对应的topic name 那么才对消息进行保存
                if (queueEntities.size() > 0) {
                    //转【3】
                    saveMsg(request, response, queueEntities);
                }
            } else {
                ......
            }
        } catch (Exception e) {
            ......
        } finally {
            ......
        }
        return response;
    }
        protected void saveSynMsg1(PublishMessageRequest request, PublishMessageResponse response,
                               List<QueueEntity> queueEntities) {
        ......
        Map<String, PartitionInfo> partitionMap = new HashMap<>();
        Map<Long, List<Message01Entity>> msgQueueMap = new HashMap<>();
        //引用传递用于构造消息实体,如果request没有指定partitionInfo的话,那么partitionMap的key为默认值Long.MAX_VALUE
        createMsg(request, msgQueueMap, partitionMap);
        for (Map.Entry<Long, List<Message01Entity>> entry : msgQueueMap.entrySet()) {
            //分三种情况进行消息进行保存
            if (queueMap.containsKey(entry.getKey())) {
                doSaveMsg(request, response, Arrays.asList(queueMap.get(entry.getKey())), entry.getValue());
            } else if (entry.getKey() == Long.MAX_VALUE) {
                //转【4】
                doSaveMsg(request, response, queueEntities, entry.getValue());
            } else {
                entry.getValue().forEach(t1 -> {
                    if (partitionMap.containsKey(t1.getTraceId())) {
                        if (partitionMap.get(t1.getTraceId()).getStrictMode() == 0) {
                            doSaveMsg(request, response, queueEntities, Arrays.asList(t1));
                        }
                    }
                });
            }
        }
    }
private void doSaveMsg(PublishMessageRequest request, PublishMessageResponse response,
                           List<QueueEntity> queueEntities, List<Message01Entity> message01Entities) {
        int tryCount = 0;
        int queueSize = queueEntities.size();
        ......
        int count = counterTemp.get(key).incrementAndGet();
        while (tryCount <= queueSize) {
            try {
                QueueEntity temp = queueEntities.get(count % queueEntities.size());
                count++;
                ......
                //关键逻辑,上一步可支持写的队列都要保存消息
                //转【5】
                doSaveMsg(message01Entities, request, response, temp);
                ......
            } catch (Exception e) {
                ......
            }
        }
        if (last != null) {
            ......
        }
        ......
    }
    protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
                             PublishMessageResponse response, QueueEntity temp) {
        //动态的设置service连接的数据库
        message01Service.setDbId(temp.getDbNodeId());
        ......
        try {
            ......
            //将消息批量的插入数据库中
            message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
            if (soaConfig.getMqPushFlag() == 1) {
                //通知对应的client端来pull data进行消费转【6】
                notifyClient(temp);
            }
            ......
            return;
        } catch (Exception e) {
           ......
        } finally {
            ......
        }
    }
 public void notifyClient(QueueEntity queueEntity) {
        try {
             ......
            //获取消息主要是topic相关的全部订阅者的详情
            List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
             ......
            Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
             //对订阅者进行遍历
            for (QueueOffsetEntity queueOffset : queueOffsetList) {
                //如果订阅者可以立刻接收且满足限速条件的话,则立刻发送
                if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
                        && speedLimit(queueEntity.getId())) {

                    //拼接订阅者的回调client的ip和地址
                    String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;

                    if (!notifyMap.containsKey(clienturl)) {
                        notifyMap.put(clienturl, new ArrayList<>());
                    }
                    //封装回调的消息体,这里只有消费者名和消息ID
                    MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
                    msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
                    msgNotifyDto.setQueueId(queueEntity.getId());
                    notifyMap.get(clienturl).add(msgNotifyDto);
                }
            }
            if (notifyMap.size() == 0) {
                return;
            }
            ......
            for (String url : notifyMap.keySet()) {
                //构建完客户端地址后,将消息体进行封装后发送给客户端接口
                try {
                    MsgNotifyRequest request = new MsgNotifyRequest();
                    request.setMsgNotifyDtos(notifyMap.get(url));
                    if (notifyFailTentativeLimit(url)) {
                        //请求的client的地址上/mq/client/notify
                        httpClient.postAsyn(url + "/mq/client/notify", request, new ConsumerServiceImpl.NotifyCallBack(url));
                    }
                } catch (Exception e) {
                    log.error("给客户端发送拉取通知异常:", e);
                }
            }
            ......
        } catch (Exception e) {

        }
    }

Client pull消息及消费过程

client消费消息的过程
Client pull消息代码流程
    @RequestMapping("/mq/client/notify")
    public void notify(@RequestBody MsgNotifyRequest request) {
        //如果客户端是开启标准(客户端有黑名单和不开启消费的却别)
        if (isOpenFlag()) {
            ......
            try {
                //转【2】
                msgNotifyService.notify(request);
                ......
            } catch (Exception e) {
                ......
            }
        }
    }
public void notify(MsgNotifyRequest request) {
        //从broke处获取消费者组的最新详情,同时更新缓存中的信息
        IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
        Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
        if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
            request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
                if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
                    //获取当前最新消费者组及其对应的队列详情
                    IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
                    Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
                    if (queues.containsKey(msgNotifyDto.getQueueId())) {
                        //消费者组中包含了待消费消息的队列Id的话,则要进行消费(这里可以理解为消费前进行下判断)转【3】
                        queues.get(msgNotifyDto.getQueueId()).notifyMsg();
                    }
                }
            });
        }
    }
    protected boolean doPullingData() {
        if (pullFlag.compareAndSet(false, true)) {
            ......
            if (consumerQueueDto != null) {
                ......
                try {
                    ......
                    if (checkOffsetVersion(consumerQueueDto)) {
                        ......
                        PullDataResponse response = null;
                        if (checkOffsetVersion(consumerQueueDto)) {
                            //请求broke端的pullData方法来拉取数据
                            response = mqResource.pullData(request);
                        }
                        ......
                        if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
                            //如果请求的消息列表不为空,则缓存起来转【4】
                            cacheData(response, consumerQueueDto);
                            ......
                        }
                    }
                    ......
                } catch (Exception e) {
                    ......
                } finally {
                    ......
                }
            }
            return false;
        } else {
            return true;
        }
    }
 protected void cacheData(PullDataResponse response, ConsumerQueueDto pre) {
        if (checkOffsetVersion(pre)) {
            for (MessageDto t1 : response.getMsgs()) {
               ......
                //将请求回来的数据放到阻塞队列messages中,并记录log
                while (true && checkOffsetVersion(pre)) {
                    try {
                        messages.put(t1);
                        addPullLog(t1);
                        break;
                    } catch (Exception e) {
                    }
                   ......
                }
                ......
            }
        }
    }
Client 消费消息代码流程
    private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {
        ......
        //消费组service在client初始化时候执行start方法
        consumerPollingService = mqFactory.createConsumerPollingService();
        //转【2】
        consumerPollingService.start();
        ......
    }
    @Override
    public void start() {
        if (startFlag.compareAndSet(false, true)) {
            ......
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    while (!isStop) {
                        ......
                        //启动常驻进程,每秒更新消费者组信息
                        longPolling();
                        ......
                    }
                }
            });
        }
    }

    protected void longPolling() {
        if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
                && mqContext.getConsumerGroupVersion().size() > 0) {
            ......
            //请求broker获取最新的消费者组的信息
            GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
            //处理相关的请求信息
            handleGroup(response);
            ......
        } else {
            Util.sleep(1000);
        }
    }

    protected void handleGroup(GetConsumerGroupResponse response) {
        ......
        //遍历消费者组,创建对应的消费组实例
        response.getConsumerGroups().entrySet().forEach(t1 -> {
            if (!isStop) {
                if (!mqExcutors.containsKey(t1.getKey())) {
                    mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
                }
                log.info("consumer_group_data_change,消费者组" + t1.getKey() + "发生重平衡或者meta更新");
                // 进行重平衡操作或者更新元数据信息
                mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());
                mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
            }
        });
        ......
        mqExcutors.values().forEach(t1 -> {
            //启动消费组实例MqGroupExcutorService类中的start方法,转【3】
            t1.start();
        });
    }
 //消费者组连续三次重平衡的版本号不变的话则开始启动组下队列的消费
    public void start() {
        if (!isRunning) {
            versionCount++;
            ......
            if (versionCount >= mqContext.getConfig().getRbTimes()) {
                //启动队列
                doStartQueue();
                isRunning = true;
            }
        }
    }

    protected void doStartQueue() {
        ......
        if (localConsumerGroup != null && localConsumerGroup.getQueues() != null
                && localConsumerGroup.getQueues().size() > 0) {
            //从消费者组中获取队列表,每个都单独启动消费实例
            localConsumerGroup.getQueues().values().forEach(t1 -> {
                IMqQueueExcutorService mqQueueExcutorService = mqFactory
                        .createMqQueueExcutorService(localConsumerGroup.getMeta().getName(), t1);
                mqEx.put(t1.getQueueId(), mqQueueExcutorService);
                //启动每个队列的监听和消费线程,转【4】
                mqQueueExcutorService.start();
            });
        }
         ......
    }
//此时监听到具体的待消费队列
    @Override
    public void start() {
        if (this.iSubscriber != null || this.iAsynSubscriber != null) {

            if (isStart.compareAndSet(false, true)) {
                //启动时会pull一次待消费的消息,后续的pull都需要由broke进行触发【如果开启立刻消费的话】
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        pullingData();
                    }
                });
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        while (!isStop) {
                            if (isRunning) {
                                //开启重置的消息出来线程,注意此处不能加锁,因为有些会出现延消费,然后出现阻塞
                                handleData();
                            } else {
                                Util.sleep(50);
                            }
                        }
                    }
                });
            }
        }
    }

    protected void handleData() {
        ......
        //阻塞队列messages中的消息量
        int msgSize = messages.size();
        //刷新订阅关系,根据消费者和topic name找到对应的消费处理类,更新在iSubscriber对象中
        refreshSubscriber();
        if (temp != null && msgSize > 0 && temp.getThreadSize() + 2 - executor.getActiveCount() > 0
                && (iSubscriber != null|| iAsynSubscriber != null)
                && (temp.getTimeout() == 0 || (temp.getTimeout() > 0 && timeOutCount.get() == 0))) {
            ......
            //开启处理数据
            doHandleData(temp, msgSize);
            ......
        } else {
            Util.sleep(10);
        }
    }

    private void doHandleData(ConsumerQueueDto pre, int msgSize) {
        // 线程批次概念关键代码
        ......
        CountDownLatch countDownLatch = new CountDownLatch(startThread);
        //按照批次进行消息消费
        batchExcute(pre, startThread, batchRecorderId, countDownLatch);
        ......
    }

    private void batchExcute(ConsumerQueueDto pre, int startThread, long batchRecorderId,
                             CountDownLatch countDownLatch) {
        for (int i = 0; i < startThread; i++) {
            if (executor != null) {
                //每个消息对应单独的处理类MsgThread,转【5】
                executor.execute(new MqQueueExcutorService.MsgThread(pre, batchRecorderId, countDownLatch, timeOutCount));
            }
        }
    }
    public class MsgThread implements Runnable {
        ......
        @Override
        public void run() {
            ......
            if (isRunning && checkOffsetVersion(pre)) {
                //启动消息消费
                maxId = threadExcute(pre);
                //更新offset
                updateOffset(pre, maxId);
            }
            ......
        }
    }
    protected long threadExcute(ConsumerQueueDto pre) {
        if (isRunning && (iSubscriber != null || iAsynSubscriber != null)) {
            ......
            //从
            if (messageMap.size() > 0) {
                ......
                //对消息进行记录并提交到对应处理类进行出来
                List<Long> failIds = invokeMessage(pre, messageMap);
                ......
            }
            ......
        }
        return 0;

    }
    protected List<Long> invokeMessage(ConsumerQueueDto temp, Map<Long, MessageDto> messageMap) {
        List<MessageDto> dtos = new ArrayList<>(messageMap.values());
        ......
        //消费消息
        failIds = doMessageReceived(dtos);
        ......
    }
    protected List<Long> doMessageReceived(List<MessageDto> dtos) throws Exception {
        if (consumerQueueRef.get().getTimeout() > 0) {
            return new MessageInvokeCommandForThreadIsolation(consumerGroupName, consumerQueueRef.get(), dtos,
                    iSubscriber,  iAsynSubscriber).execute();
        } else {
            //走此分支的话,则根据刚刚(4)中已经确定iSubscriber处理类来进行出来,转【6】
            return MessageInvokeCommandForThreadIsolation.invoke(dtos, iSubscriber,  iAsynSubscriber,
                    consumerQueueRef.get());
        }
    }
    public static List<Long> invoke(List<MessageDto> dtos, ISubscriber iSubscriber, IAsynSubscriber iAsynSubscriber,
                                    ConsumerQueueDto pre) throws Exception {
         ......
        if (iSubscriber != null) {
            //回到client消费实现类中
            failIds = iSubscriber.onMessageReceived(dtos);
             ......
        }else if (iAsynSubscriber != null) {
             ......
        }
        return failIds;
    }
    public class TestSub implements ISubscriber {
        @Override
        public List<Long> onMessageReceived(List<MessageDto> messages) {
            System.out.println(messages.get(0).getBody());
            return new ArrayList<>();
        }
    }
<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
    <consumer groupName="test1sub">
        <topics>
            <topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
            <topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
        </topics>
    </consumer>
</messageQueue>
上一篇下一篇

猜你喜欢

热点阅读