定时消息机制
概览
定时消息是指消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
所有延迟消息都使用同一主题SCHEDULE_TOPIC_XXXX,这一主题下一个消息延迟级别,一个消息队列MessageQueue,一个定时任务TimerTask进行处理。如果消息的delayLevel大于0,代表延迟消息,首先将它的原来topic、queueId存入到消息属性中,然后改变消息的topic为SCHEDULE_TOPIC_XXXX、queueId为delayLevel-1这一队列中;然后存入到commitlog中。TimerTask执行定时任务,从延迟队列取出这个消息,根据topic,queueId获取consumequeue,从consumequeue中获取commitlog的message,再将message的REAL_TOPIC、REAL_QID属性进行topic、queueId还原,再次存入到commitlog文件中,等待消费者消费。
加载
延迟消费队列的消费进度定时会存储到 ${ROCKET HOME}rocketmq/store/config/delayOffset.json文件中,所以在项目启动时,需要加载历史消费记录;还要完成消息延迟延迟消息级别和延迟时间的delayLevelTable数据的构造;
delayOffset.json
键值对为queueId:offset
{
"offsetTable":{12:0,6:0,13:0,5:1,18:0,3:22}
}
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
public boolean load() {
boolean result = super.load();
// delayLevelTable数据的构造
result = result && this.parseDelayLevel();
return result;
}
启动定时服务
- 为每一个延迟队列创建一个调度任务,每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列;
- 启动定时任务,每隔10s持久化延迟消息队列消息进度。
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 定时任务,每一个定时任务启动时,首先延迟1s;以后每次调度时,首先延迟0.1s,然后再执行定时调度时间
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// Broker端默认10s持久化一次消息进度,存储文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
DeliverDelayedMessageTimerTask
DeliverDelayedMessageTimerTask继承TimerTask是延迟消息定时任务,一个延迟队列一个延时TimeTask任务。TimeTask定时时间机制为延迟1秒执行第一次,以后每0.1秒执行一次DeliverDelayedMessageTimerTask任务;也就是executeOnTimeup()方法,处理一个延迟级别的消息队列,根据延迟级别消息队列的消费offset值,取出在这个延迟级别消息队列offset后面的message,进行延迟时间判断,如果第一个Message的延迟时间不满足,那就继续提交这个DeliverDelayedMessageTimerTask定时任务,其他后面消息也不满足延迟时间,直接return;如果第一个Message到了延迟时间,还原真实的Topic、QueueId将消息再次放入commitlog中,进入真实的topic的队列。如果过程中出现了异常情况,也是重新提交这个DeliverDelayedMessageTimerTask定时任务。
@Override
public void run() {
try {
if (isStarted()) {
//
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
// 查找SCHEDULE_TOPIC_XXXX下延迟消息的ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 先初始化失败调度offset为最开始offset,后面再更新
long failScheduleOffset = offset;
if (cq != null) {
// 第一次获取时offset初始值为0;获取这个offset之后的所有message值
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
// ConsumeQueueExt 存储ConsumeQueue的扩展属性;CqExtUnit为扩展属性byte字节大小
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 计算出一个message的offset,消息大小,tagHashCode
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 消息物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 消息大小
int sizePy = bufferCQ.getByteBuffer().getInt();
// tagHashCode
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 计算是否到达定期时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 下一条消息在consumequeue中的offset大小,每个消息在consumequeue中存储大小为20
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 计算消息是否到期剩余时间
long countdown = deliverTimestamp - now;
// 到达消息指定延迟时间
if (countdown <= 0) {
// 根据物理offset、消息大小从commitlog获取消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
// 获取到消息
if (msgExt != null) {
try {
// 还原成为原来的消息,还原topic,queueId
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 重新存储消息到commitlog文件中
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
// 存储成功,继续下一条消息
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
// 失败记录log
// 再次执行延时任务,更新不同延时级别对应的消费消息offset
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
// 未到消息指定的延迟时间,继续构建一个延时任务放入定时任务中,更新等待时间
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 再次创建延时等待任务,等待下次延迟队列中的数据,进行处理。更新延迟队列消费的offset
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
// 释放获取到的内存消息
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
//未找到有效的消息,更新延迟队列定时拉取进度
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
//未从commitlog中获取到延时消息,创建延迟任务,等待commitlog中消息的到来。
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}