(十一)重试消费---延时调度机制

2021-07-27  本文已影响0人  guessguess

在当消息消费失败,返回给broker的时候,有一个延时级别,然后在保存消息之前,会根据延时级别将主题修改。

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        msg.setStoreTimestamp(System.currentTimeMillis());
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                修改主题为"SCHEDULE_TOPIC_XXXX"
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                备份原有的topic以及队列id,用于后续转发
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                更新主题
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            return putMessageResult;
        });
    }
}

那么这个调度主题SCHEDULE_TOPIC_XXXX里面的消息是如何被消费的。这个主题其实是broker内部的主题,那么生产跟消费,必然都是broker内部去实现的。

下面先来看看一个接口MessageStore

MessageStore

这个接口定义了如何从commitLog中读取数据,写入数据,以及查询数据。

public interface MessageStore {
    boolean load();
    void start() throws Exception;
    void shutdown();
    void destroy();
    default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        return CompletableFuture.completedFuture(putMessage(msg));
    }
    default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
        return CompletableFuture.completedFuture(putMessages(messageExtBatch));
    }
    PutMessageResult putMessage(final MessageExtBrokerInner msg);
    PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
    GetMessageResult getMessage(final String group, final String topic, final int queueId,
        final long offset, final int maxMsgNums, final MessageFilter messageFilter);
    long getMaxOffsetInQueue(final String topic, final int queueId);
    long getMinOffsetInQueue(final String topic, final int queueId);
    long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
    long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
    MessageExt lookMessageByOffset(final long commitLogOffset);
    SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
    SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
    String getRunningDataInfo();
    HashMap<String, String> getRuntimeInfo();
    long getMaxPhyOffset();
    long getMinPhyOffset();
    long getEarliestMessageTime(final String topic, final int queueId);
    long getEarliestMessageTime();
    long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
    long getMessageTotalInQueue(final String topic, final int queueId);
    SelectMappedBufferResult getCommitLogData(final long offset);
    boolean appendToCommitLog(final long startOffset, final byte[] data);
    void executeDeleteFilesManually();
    QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
        final long end);
    void updateHaMasterAddress(final String newAddr);
    long slaveFallBehindMuch();
    long now();
    int cleanUnusedTopic(final Set<String> topics);
    void cleanExpiredConsumerQueue();
    boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
    long dispatchBehindBytes();
    long flush();
    boolean resetWriteOffset(long phyOffset);
    long getConfirmOffset();
    void setConfirmOffset(long phyOffset);
    boolean isOSPageCacheBusy();
    long lockTimeMills();
    boolean isTransientStorePoolDeficient();
    LinkedList<CommitLogDispatcher> getDispatcherList();
    ConsumeQueue getConsumeQueue(String topic, int queueId);
    BrokerStatsManager getBrokerStatsManager();
    这个方法是延时调度的核心
    void handleScheduleMessageService(BrokerRole brokerRole);
}

简单看看接口定义的方法,就大概知道这个接口是做什么用的。就是用于跟commitLog交互的。那么下面看看其对于的实现类DefaultMessageStore.

DefaultMessageStore

对于handleScheduleMessageService的实现

public class DefaultMessageStore implements MessageStore {
    private final ScheduleMessageService scheduleMessageService;
    @Override
    public void handleScheduleMessageService(final BrokerRole brokerRole) {
        if (this.scheduleMessageService != null) {
            if (brokerRole == BrokerRole.SLAVE) {
                this.scheduleMessageService.shutdown();
            } else {
                为master才可以进行延时调度
                this.scheduleMessageService.start();
            }
        }
    }
}

下面直接看看如何Start

public class ScheduleMessageService extends ConfigManager {
    启动的标记
    private final AtomicBoolean started = new AtomicBoolean(false);
    定时器
    private Timer timer;
    延时级别,以及对应的延时时间
    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);
    延时级别对应的队列的偏移量
    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<Integer, Long>(32);


    public void start() {
        避免重复启动
        if (started.compareAndSet(false, true)) {
            后台启动一个守护线程
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            遍历延时级别的表,每一个延时级别对应不同的延时时间
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                
                if (timeDelay != null) {
                    根据延时级别以及偏移量创建定时任务,第一次延时执行
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }

            this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    try {
                        定时进行持久化
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }
}

从上面代码来看,就是每一个延时级别创建一个定时任务。执行内容为DeliverDelayedMessageTimerTask
随后再创建一个持久化的定时任务。

如何根据延时级别进行延时处理

public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask extends TimerTask {
        private final int delayLevel;
        private final long offset;

        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
            this.delayLevel = delayLevel;
            this.offset = offset;
        }

        @Override
        public void run() {
            try {
                if (isStarted()) {
                    实现的核心
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }
    }
}

从上面代码看 实现的核心在于executeOnTimeup方法。
接下来看看是如何实现的。

public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask extends TimerTask {
        private final int delayLevel;
        private final long offset;
    class DeliverDelayedMessageTimerTask extends TimerTask {
        public void executeOnTimeup() {
            1.根据调度主题,以及延时级别找到对应的队列。(如果不存在就创建队列),如果创建后队列还为空,啥也不做。
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) {
                                根据偏移量取出消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        2.将消息还原
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                                    msgInner.getTopic(), msgInner);
                                            continue;
                                        }
                                        3.将消息写入
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            出现异常,延时调度
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            更新延时级别的偏移量
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            结束
                                            return;
                                        }
                                    } catch (Exception e) {
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } 

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }
    }
}

上面的代码比较复杂,但是核心还是在于从队列中取出消息。然后进行还原,最后写入。

消息还原

还原的方法如下

public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask extends TimerTask {
        private final int delayLevel;
        private final long offset;
        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setBody(msgExt.getBody());
            msgInner.setFlag(msgExt.getFlag());
            MessageAccessor.setProperties(msgInner, msgExt.getProperties());

            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
            long tagsCodeValue =
                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
            msgInner.setTagsCode(tagsCodeValue);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

            msgInner.setSysFlag(msgExt.getSysFlag());
            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
            msgInner.setBornHost(msgExt.getBornHost());
            msgInner.setStoreHost(msgExt.getStoreHost());
            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

            msgInner.setWaitStoreMsgOK(false);
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
            将主题还原,真实主题以及队列id被保存在property中
----------------------------------------------------------------------------------------------------------------------------------------------
            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
----------------------------------------------------------------------------------------------------------------------------------------------
            int queueId = Integer.parseInt(queueIdStr);
            msgInner.setQueueId(queueId);

            return msgInner;
        }
}

延时调度的机制就是,给每个延时级别分配一个定时任务,用于将每个延时级别对应的队列中的消息进行消费,这个消费其实就是将消息还原,最后写入到真实TOPIC对应的队列中去。

上一篇下一篇

猜你喜欢

热点阅读