store模块阅读11:MappedFileQueue

2017-10-19  本文已影响318人  赤子心_d709

说明

这个类是管理MappedFile的队列的,与MappedFile是1:n的关系
作用:

提供了一系列根据offset或者timestamp来查找,定位,删除,清除MappedFile的逻辑
提供flushedWhere,committedWhere 标记整个队列中flush以及commit的offset

属性

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;//一次最多删除的文件数量

    private final String storePath;//文件队列的存储路径

    private final int mappedFileSize;//一个mappedFile文件大小,见MessageStoreConfig.mapedFileSizeCommitLog,默认1G

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//写时复制,

    private final AllocateMappedFileService allocateMappedFileService;//分配mappedFile的线程服务

    private long flushedWhere = 0;//已经flush到的位置(是某一个mappedFile中的一个位置)
    private long committedWhere = 0;//已经commit到的位置(是某一个mappedFile中的一个位置)

    private volatile long storeTimestamp = 0;

注意:

CopyOnWriteArrayList队列:是写的时候copy一个出来写,写完了再把引用指过去,参照refer
flushedWhere, committedWhere 代表flush和commit的偏移地址

函数

构造函数

    public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }

分别是存储路径,文件大小,分配服务

检查

checkSelf完成自检
检查mappedFiles中,除去最后一个文件,其余每一个mappedFile的大小是否是mappedFileSize

    public void checkSelf() {

        if (!this.mappedFiles.isEmpty()) {
            Iterator<MappedFile> iterator = mappedFiles.iterator();
            MappedFile pre = null;
            while (iterator.hasNext()) {
                MappedFile cur = iterator.next();

                if (pre != null) {
                    if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
                        LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
                            pre.getFileName(), cur.getFileName());
                    }
                }
                pre = cur;
            }
        }
    }

get属性信息相关

这里涉及commit以及flush位置的获取,判断

函数 作用
getMaxOffset 实际上就是commit的位置
getMaxWrotePosition wrote到的位置
remainHowManyDataToCommit wrote到的位置,到queue中记录的commit的位置之差
remainHowManyDataToFlush commit到的位置,到queue中记录的flush的位置之差

如下

    /**
     * 获取最大偏移量
     * 即最后一个MappedFile允许读到的位置
     */
    public long getMaxOffset() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();//相对偏移 + 允许读到的位置
        }
        return 0;
    }

    /**
     * 获取最大写的位置
     * 即最后一个MappedFile写到的位置
     */
    public long getMaxWrotePosition() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
        }
        return 0;
    }

    /**
     * 还有多少字节等待commit的
     * 即wrote与commit位置之差
     */
    public long remainHowManyDataToCommit() {
        return getMaxWrotePosition() - committedWhere;
    }

    /**
     * 还有多少字节等待flush的
     * 即flush与commit位置之差
     */
    public long remainHowManyDataToFlush() {
        return getMaxOffset() - flushedWhere;
    }

getMappedFile相关

getMappedFileByTime

获取最后修改时间在timestamp之后的第一个mappedFile,没有的话就返回最后一个mappedFile

    public MappedFile getMappedFileByTime(final long timestamp) {
        Object[] mfs = this.copyMappedFiles(0);

        if (null == mfs)
            return null;

        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }

        return (MappedFile) mfs[mfs.length - 1];
    }

findMappedFileByOffset

通过offset找到所在的mappedFile

    public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            MappedFile mappedFile = this.getFirstMappedFile();
            if (mappedFile != null) {
                int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
                if (index < 0 || index >= this.mappedFiles.size()) {
                    LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
                            "mappedFileSize: {}, mappedFiles count: {}",
                        mappedFile,
                        offset,
                        index,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                }

                try {
                    return this.mappedFiles.get(index);
                } catch (Exception e) {
                    if (returnFirstOnNotFound) {
                        return mappedFile;//遇到异常,允许返回第一个
                    }
                    LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }


    public MappedFile findMappedFileByOffset(final long offset) {
        return findMappedFileByOffset(offset, false);
    }

getFirstMappedFile

返回mappedFiles队列中的第一个文件

    public MappedFile getFirstMappedFile() {
        MappedFile mappedFileFirst = null;

        if (!this.mappedFiles.isEmpty()) {
            try {
                mappedFileFirst = this.mappedFiles.get(0);
            } catch (IndexOutOfBoundsException e) {
                //ignore
            } catch (Exception e) {
                log.error("getFirstMappedFile has exception.", e);
            }
        }

        return mappedFileFirst;
    }

getLastMappedFile

有几种形式

    /**
     * 返回mappedFiles中最后一个mappedFile,
     * 如果mappedFiles为空,根据startOffset以及needCreate判断是否需要创建出来最新的mappedFile
     * 如果mappedFiles最后一个写满了,根据needCreate判断是否需要创建出来最新的mappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        if (createOffset != -1 && needCreate) {
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);//创建nextFilePath,并将nextNextFilePath放入队列,以便异步处理
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);//同步创建
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);//队列中创建的第一个
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

    /**
     * 返回mappedFiles中最后一个mappedFile,
     * 如果mappedFiles为空,根据startOffset创建出来最新的mappedFile
     * 如果mappedFiles最后一个写满了,则创建出来最新的mappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset) {
        return getLastMappedFile(startOffset, true);
    }

    //返回mappedFiles中最后一个mappedFile
    public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;

        while (!this.mappedFiles.isEmpty()) {
            try {
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }

        return mappedFileLast;
    }

reset相关

将offset以后的MappedFile都清除掉,但是代码似乎有bug,在吐槽中说

    public boolean resetOffset(long offset) {
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast != null) {
            long lastOffset = mappedFileLast.getFileFromOffset() +
                mappedFileLast.getWrotePosition();
            long diff = lastOffset - offset;//最后写到的位置 与 要求的offset之间的差距

            final int maxDiff = this.mappedFileSize * 2;
            if (diff > maxDiff)//如果超过了2个fileSize就失败
                return false;
        }

        ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();//得到一个iter

        while (iterator.hasPrevious()) {//这里似乎是bug,总会返回false,因为上一行cursor会是0
            mappedFileLast = iterator.previous();
            if (offset >= mappedFileLast.getFileFromOffset()) {
                int where = (int) (offset % mappedFileLast.getFileSize());
                mappedFileLast.setFlushedPosition(where);
                mappedFileLast.setWrotePosition(where);
                mappedFileLast.setCommittedPosition(where);
                break;
            } else {
                iterator.remove();
            }
        }
        return true;
    }

清理

truncateDirtyFiles

/**
* 处理offset以上的MappedFile,认为是dirty的
* MappedFile包含了offset的去掉offset后续部分
* MappedFile超过了offset的直接删掉
*/

    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//得到每个MappedFile结束时偏移
            if (fileTailOffset > offset) {
                if (offset >= file.getFileFromOffset()) {//包含有offset的文件,去掉后续尾巴
                    //更改file中的相对position位置
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {//文件开头就比offset大的,清除掉
                    file.destroy(1000);//关闭fileChannel,删除文件
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

deleteExpiredFile

从mappedFiles中删除对应的files记录

    void deleteExpiredFile(List<MappedFile> files) {

        if (!files.isEmpty()) {

            Iterator<MappedFile> iterator = files.iterator();
            while (iterator.hasNext()) {
                MappedFile cur = iterator.next();
                if (!this.mappedFiles.contains(cur)) {
                    iterator.remove();
                    log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
                }
            }

            try {
                if (!this.mappedFiles.removeAll(files)) {
                    log.error("deleteExpiredFile remove failed.");
                }
            } catch (Exception e) {
                log.error("deleteExpiredFile has exception.", e);
            }
        }
    }

deleteLastMappedFile

删除,清理最后一个mappedFile

    public void deleteLastMappedFile() {
        MappedFile lastMappedFile = getLastMappedFile();
        if (lastMappedFile != null) {
            lastMappedFile.destroy(1000);
            this.mappedFiles.remove(lastMappedFile);
            log.info("on recover, destroy a logic mapped file " + lastMappedFile.getFileName());

        }
    }

deleteExpiredFileByTime

根据expiredTime删除过期文件,返回删除文件的数量

    public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
        Object[] mfs = this.copyMappedFiles(0);//转化mappedFiles成为 object[]数组

        if (null == mfs)
            return 0;

        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {//过期了,或者立即清除
                    if (mappedFile.destroy(intervalForcibly)) {//在intervalForcibly时间内删除文件
                        files.add(mappedFile);
                        deleteCount++;

                        if (files.size() >= DELETE_FILES_BATCH_MAX) {//一次最多删除10个
                            break;
                        }

                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);//删除了一个文件之后,等待一段时间再删除下一个
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                }
            }
        }

        deleteExpiredFile(files);//从mappedFiles中删除记录

        return deleteCount;
    }

retryDeleteFirstFile

删除第一个文件

    public boolean retryDeleteFirstFile(final long intervalForcibly) {
        MappedFile mappedFile = this.getFirstMappedFile();
        if (mappedFile != null) {
            if (!mappedFile.isAvailable()) {
                log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
                boolean result = mappedFile.destroy(intervalForcibly);
                if (result) {
                    log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
                    List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
                    tmpFiles.add(mappedFile);
                    this.deleteExpiredFile(tmpFiles);
                } else {
                    log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
                }

                return result;
            }
        }

        return false;
    }

deleteExpiredFileByOffset

/**
* 这里假定每个mappedFile的存储单元都是uniteSize这么大,而且0,unitSize等这些位置,存储的是一个long offset值
* 在所有mappedFile中,找到最后一条存储单元记录(mappedFileSize - unitSize),读取long代表该mappedFile的maxOffsetInLogicQueue
* 如果 maxOffsetInLogicQueue < offset 就删除掉
*/

    public int deleteExpiredFileByOffset(long offset, int unitSize) {
        Object[] mfs = this.copyMappedFiles(0);

        List<MappedFile> files = new ArrayList<MappedFile>();
        int deleteCount = 0;
        if (null != mfs) {

            int mfsLength = mfs.length - 1;

            for (int i = 0; i < mfsLength; i++) {
                boolean destroy;
                MappedFile mappedFile = (MappedFile) mfs[i];
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);//截至最后一个unit部分
                if (result != null) {
                    long maxOffsetInLogicQueue = result.getByteBuffer().getLong();//该文件最大的offSet
                    result.release();
                    destroy = maxOffsetInLogicQueue < offset;//是否需要删除
                    if (destroy) {
                        log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                            + maxOffsetInLogicQueue + ", delete it");
                    }
                } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                    log.warn("Found a hanged consume queue file, attempting to delete it.");
                    destroy = true;
                } else {
                    log.warn("this being not executed forever.");
                    break;
                }

                if (destroy && mappedFile.destroy(1000 * 60)) {
                    files.add(mappedFile);
                    deleteCount++;
                } else {
                    break;
                }
            }
        }

        deleteExpiredFile(files);//删除记录

        return deleteCount;
    }

flush 与 commit

    /**
     * 从上一次flush位置对应的MappedFile,进行flush
     * 更新flushedWhere
     * 返回false代表真正flush了,true代表没有变
     */
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);//已经刷到磁盘的位置,找到对应的mappedFile
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);//得到新的刷到的位置(相对该mappedFile的位置)
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;//如果flush成功了result会为false
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

    /**
     * 上一次的committedWhere对应的mappedFile执行commit
     * 更新committedWhere
     * result为false代表真正的commit了
     */
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);//执行commit之后,mappedFile记录的相对commit的位置
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;//false才代表commit执行了
            this.committedWhere = where;
        }

        return result;
    }

加载

load函数

/**
* 类似于恢复,重启时加载数据
* 读取storePath下面所有文件,对于大小是mappedFileSize记录在队列中
* 设置commit,wrote,flushPosition为mappedFileSize(类似于重启之后,标记这些位置都处理过)
*/

    public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, ignore it");
                    return true;//大小不一样直接返回,一般在最后一个mappedFile,没有写完
                }

                try {
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                    //前面存在的mappedFile是以前已经处理完的,这里再表识一下
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);//添加进mappedFiles队列
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

destroy

    //清除所有mappedFiles
    public void destroy() {
        for (MappedFile mf : this.mappedFiles) {
            mf.destroy(1000 * 3);
        }
        this.mappedFiles.clear();
        this.flushedWhere = 0;

        // delete parent directory
        File file = new File(storePath);
        if (file.isDirectory()) {
            file.delete();
        }
    }

其他

copyMappedFiles完成

/**
* 把mappedFiles 转成Object[]
* 如果队列大小 <= reservedMappedFiles, 那么就返回null
* 调用方的reservedMappedFiles都是0
*/

    private Object[] copyMappedFiles(final int reservedMappedFiles) {
        Object[] mfs;

        if (this.mappedFiles.size() <= reservedMappedFiles) {
            return null;
        }

        mfs = this.mappedFiles.toArray();
        return mfs;
    }

还有一些没有调用到的函数,就没有讲

思考

remainHowManyDataToCommit与remainHowManyDataToFlush函数区别

remainHowManyDataToCommit:最新的MappedFile的写到的位置 与 当前commit位置的差距
remainHowManyDataToFlush:最新的MappedFile的commit的位置 与 当前flush位置的差距
即write >= commit >= flush位置

truncateDirtyFiles时,包含有指定offset的mappedFile后续记录是如何清除的

通过指定wrote,commit,flush的position即可

问题

暂时不知道为什么有这么多函数,哪些函数是重要的

吐槽

copyMappedFiles的参数

让人莫名奇妙,好在调用方都是0

resetOffset的执行

代码中
ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();//得到一个iter
参考实现,在执行while (iterator.hasPrevious()) 一定会返回false的不会为true
我跑去提了issue
https://issues.apache.org/jira/browse/ROCKETMQ-303

refer

http://ifeve.com/java-copy-on-write/ CopyOnWriteArrayList

上一篇 下一篇

猜你喜欢

热点阅读