RocketMQ文件过期策略详解

2019-03-23  本文已影响0人  漫步无法人生

1.为什么会有文件过期删除机制

由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制

2.RocketMQ删除过期文件的思路

RocketMQ顺序写CommitLog文件、ComsumeQueue文件,所有的写操作都会落到最后一个文件上,因此在当前写文件之前的文件将不会有数据插入,也就不会有任何变动,因此可通过时间来做判断,比如超过72小时未更新的文件将会被删除

PS:RocketMQ删除过期文件时不会关注该文件的内容是否全部被消费

3.文件过期删除机制实现

3.1 触发删除的操作

image.png

由上图可知,触发文件清除操作的是一个定时任务,而且只有定时任务

// Resource reclaim interval
    private int cleanResourceInterval = 10000;

文件过期删除定时任务的周期由该删除决定,默认每10s执行一次

3.2 删除源码分析

MesssageSDefaultStore#CleanCommitLogService#deleteExpiredFiles

private void deleteExpiredFiles() {
//省略
}

我们一点一点来分析其中的代码

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

上面三个属性需要重点讲解下

  1. fileReservedTime:文件过期时间,也就是从文件最后一次的更新时间到现在为止,如果超过该时间,则是过期文件可被删除
  2. deletePhysicFilesInterval:删除物理文件的时间间隔,在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,我猜测可能是由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件
  3. destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用 小于等于 0为止,即可删除该文件,实现代码如下:

ReferenceResource#shutdown

public void shutdown(final long intervalForcibly) {
        if (this.available) {
            this.available = false;
            this.firstShutdownTimestamp = System.currentTimeMillis();
            this.release();
        } else if (this.getRefCount() > 0) {
            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                this.refCount.set(-1000 - this.getRefCount());
                this.release();
            }
        }
    }

MesssageSDefaultStore#CleanCommitLogService#deleteExpiredFiles

boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

if (timeup || spacefull || manualDelete) {
    //执行删除逻辑
}
  1. timeup:是否到了指定的时间点,RocketMQ可通过deleteWhere设置每天固定的时间删除过期文件,默认是凌晨4点,由于这个参数设定是小时,因此在这1个小时内每次定时任务都能进来删除过期的文件 image.png

UtilAll#isItTimeToDo

public static boolean isItTimeToDo(final String when) {
        String[] whiles = when.split(";");
        if (whiles.length > 0) {
            Calendar now = Calendar.getInstance();
            for (String w : whiles) {
                int nowHour = Integer.parseInt(w);
                if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
                    return true;
                }
            }
        }

        return false;
    }
  1. spacefull:磁盘空间是否充足,磁盘不足返回true,执行过期文件删除策略,我们进去看看this.isSpaceToDelete()这个方法

    MesssageSDefaultStore#CleanCommitLogService#isSpaceToDelete

    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    //是否立即清除
    cleanImmediately = false;
    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        if (physicRatio > diskSpaceWarningLevelRatio) { /**#1*/
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }
            cleanImmediately = true;
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {/**#2*/
            cleanImmediately = true;
        } else {/**#3*/
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
            }
        }
        if (physicRatio < 0 || physicRatio > ratio) {/**#4*/
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
        }
    }
    

    1:物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入

    2:物理使用率大于diskSpaceCleanForciblyRatio(默认85%,可设置),则过进行过期物理文件的删除

    3:恢复磁盘可写 配合 #1使用

    4:物理磁盘使用率小于diskMaxUsedSpaceRatio 表示磁盘使用正常

  2. manualDelete:预留,手工触发,目前rocketmq暂未封装

MappedFileQueue#deleteExpiredFileByTime

for (int i = 0; i < mfsLength; i++) {
    MappedFile mappedFile = (MappedFile) mfs[i];
    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
        if (mappedFile.destroy(intervalForcibly)) {
            files.add(mappedFile);
            deleteCount++;

            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                break;
            }

            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                try {
                    Thread.sleep(deleteFilesInterval);
                } catch (InterruptedException e) {
                }
            }
        } else {
            break;
        }
    } else {
        //avoid deleting files in the middle
        break;
    }
}
image.png

从第一个文件开始遍历,判断该文件的最大存活时间(该文件的最后一次更新时间+文件的存活时间)小于 当前系统时间 或者 需要强制删除文件(上面有讲,哪些情况下强制删除),则执行MappedFile#destroy 方法清除MappedFile占用的相关资源,如果执行成功则将该文件加入待删除文件列表中统一从磁盘中删除。DELETE_FILES_BATCH_MAX 决定每次能够删除的文件个数上限、deleteFilesInterval连续清除两个文件的时间间隔由该参数决定(deletePhysicFilesInterval)


4.总结-整体流程

  1. 开启定时任务每10s扫描是否有文件需要删除
  2. 有三种情况会进入删除文件操作:到了deleteWhere指定的时间点(默认是凌晨4点)、磁盘不足、手动触发
  3. 对于磁盘不足的情况,当磁盘使用率大于磁盘空间警戒线水位(默认是90%),会阻止消息写入,当超过85%时会强制删除文件(需要设置允许强制删除参数,否者不生效),其他两种情况都只能删除过期的文件(文件最后更新时间+文件最大的存活时间 < 当前时间)
  4. 当被删除的文件存在引用时,会有一个文件删除缓存时间,在这段时间内,该文件不会被删除,主要是留给引用该文件程序一些时间,当超过了文件删除缓存时间后,每次都会将该文件的引用减少1000,直到减少小于等于0后才释放该文件引用的相关资源,然后将该文件放入一个“文件删除集合”中
  5. 一次连续删除文件中间会存在一定的间隔,不会连续释放文件相关的资源
  6. 一次连续删除的文件和不大于10
  7. 将“文件删除集合”中的文件从
上一篇下一篇

猜你喜欢

热点阅读