RocketMQ源码解读程序员

定时消息机制

2021-01-12  本文已影响0人  93张先生

概览

定时消息是指消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费。

broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

所有延迟消息都使用同一主题SCHEDULE_TOPIC_XXXX,这一主题下一个消息延迟级别,一个消息队列MessageQueue,一个定时任务TimerTask进行处理。如果消息的delayLevel大于0,代表延迟消息,首先将它的原来topic、queueId存入到消息属性中,然后改变消息的topic为SCHEDULE_TOPIC_XXXX、queueId为delayLevel-1这一队列中;然后存入到commitlog中。TimerTask执行定时任务,从延迟队列取出这个消息,根据topic,queueId获取consumequeue,从consumequeue中获取commitlog的message,再将message的REAL_TOPIC、REAL_QID属性进行topic、queueId还原,再次存入到commitlog文件中,等待消费者消费。

加载

延迟消费队列的消费进度定时会存储到 ${ROCKET HOME}rocketmq/store/config/delayOffset.json文件中,所以在项目启动时,需要加载历史消费记录;还要完成消息延迟延迟消息级别和延迟时间的delayLevelTable数据的构造;

delayOffset.json
键值对为queueId:offset
{
    "offsetTable":{12:0,6:0,13:0,5:1,18:0,3:22}
}

// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);
public boolean load() {
    boolean result = super.load();
    // delayLevelTable数据的构造
    result = result && this.parseDelayLevel();
    return result;
}

启动定时服务

  1. 为每一个延迟队列创建一个调度任务,每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列;
  2. 启动定时任务,每隔10s持久化延迟消息队列消息进度。
public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 定时任务,每一个定时任务启动时,首先延迟1s;以后每次调度时,首先延迟0.1s,然后再执行定时调度时间
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        // Broker端默认10s持久化一次消息进度,存储文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}
DeliverDelayedMessageTimerTask

DeliverDelayedMessageTimerTask继承TimerTask是延迟消息定时任务,一个延迟队列一个延时TimeTask任务。TimeTask定时时间机制为延迟1秒执行第一次,以后每0.1秒执行一次DeliverDelayedMessageTimerTask任务;也就是executeOnTimeup()方法,处理一个延迟级别的消息队列,根据延迟级别消息队列的消费offset值,取出在这个延迟级别消息队列offset后面的message,进行延迟时间判断,如果第一个Message的延迟时间不满足,那就继续提交这个DeliverDelayedMessageTimerTask定时任务,其他后面消息也不满足延迟时间,直接return;如果第一个Message到了延迟时间,还原真实的Topic、QueueId将消息再次放入commitlog中,进入真实的topic的队列。如果过程中出现了异常情况,也是重新提交这个DeliverDelayedMessageTimerTask定时任务。

@Override
public void run() {
    try {
        if (isStarted()) {
            // 
            this.executeOnTimeup();
        }
    } catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

public void executeOnTimeup() {
    // 查找SCHEDULE_TOPIC_XXXX下延迟消息的ConsumeQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
    // 先初始化失败调度offset为最开始offset,后面再更新
    long failScheduleOffset = offset;

    if (cq != null) {
        // 第一次获取时offset初始值为0;获取这个offset之后的所有message值
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                // ConsumeQueueExt 存储ConsumeQueue的扩展属性;CqExtUnit为扩展属性byte字节大小
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // 计算出一个message的offset,消息大小,tagHashCode
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 消息物理偏移量
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // 消息大小
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // tagHashCode
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 计算是否到达定期时间
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 下一条消息在consumequeue中的offset大小,每个消息在consumequeue中存储大小为20
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 计算消息是否到期剩余时间
                    long countdown = deliverTimestamp - now;
                    // 到达消息指定延迟时间
                    if (countdown <= 0) {
                        // 根据物理offset、消息大小从commitlog获取消息
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);
                        // 获取到消息
                        if (msgExt != null) {
                            try {
                                // 还原成为原来的消息,还原topic,queueId
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                            msgInner.getTopic(), msgInner);
                                    continue;
                                }
                                // 重新存储消息到commitlog文件中
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner);
                                // 存储成功,继续下一条消息
                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    // 失败记录log
                                    // 再次执行延时任务,更新不同延时级别对应的消费消息offset
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        // 未到消息指定的延迟时间,继续构建一个延时任务放入定时任务中,更新等待时间
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                // 再次创建延时等待任务,等待下次延迟队列中的数据,进行处理。更新延迟队列消费的offset
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {
                // 释放获取到的内存消息
                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {
            //未找到有效的消息,更新延迟队列定时拉取进度
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
    //未从commitlog中获取到延时消息,创建延迟任务,等待commitlog中消息的到来。
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}
上一篇下一篇

猜你喜欢

热点阅读