store模块阅读11:MappedFileQueue
说明
这个类是管理MappedFile的队列的,与MappedFile是1:n的关系
作用:
提供了一系列根据offset或者timestamp来查找,定位,删除,清除MappedFile的逻辑
提供flushedWhere,committedWhere 标记整个队列中flush以及commit的offset
属性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private static final int DELETE_FILES_BATCH_MAX = 10;//一次最多删除的文件数量
private final String storePath;//文件队列的存储路径
private final int mappedFileSize;//一个mappedFile文件大小,见MessageStoreConfig.mapedFileSizeCommitLog,默认1G
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//写时复制,
private final AllocateMappedFileService allocateMappedFileService;//分配mappedFile的线程服务
private long flushedWhere = 0;//已经flush到的位置(是某一个mappedFile中的一个位置)
private long committedWhere = 0;//已经commit到的位置(是某一个mappedFile中的一个位置)
private volatile long storeTimestamp = 0;
注意:
CopyOnWriteArrayList队列:是写的时候copy一个出来写,写完了再把引用指过去,参照refer
flushedWhere, committedWhere 代表flush和commit的偏移地址
函数
构造函数
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
}
分别是存储路径,文件大小,分配服务
检查
checkSelf完成自检
检查mappedFiles中,除去最后一个文件,其余每一个mappedFile的大小是否是mappedFileSize
public void checkSelf() {
if (!this.mappedFiles.isEmpty()) {
Iterator<MappedFile> iterator = mappedFiles.iterator();
MappedFile pre = null;
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
if (pre != null) {
if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
pre.getFileName(), cur.getFileName());
}
}
pre = cur;
}
}
}
get属性信息相关
这里涉及commit以及flush位置的获取,判断
函数 | 作用 |
---|---|
getMaxOffset | 实际上就是commit的位置 |
getMaxWrotePosition | wrote到的位置 |
remainHowManyDataToCommit | wrote到的位置,到queue中记录的commit的位置之差 |
remainHowManyDataToFlush | commit到的位置,到queue中记录的flush的位置之差 |
如下
/**
* 获取最大偏移量
* 即最后一个MappedFile允许读到的位置
*/
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();//相对偏移 + 允许读到的位置
}
return 0;
}
/**
* 获取最大写的位置
* 即最后一个MappedFile写到的位置
*/
public long getMaxWrotePosition() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
return 0;
}
/**
* 还有多少字节等待commit的
* 即wrote与commit位置之差
*/
public long remainHowManyDataToCommit() {
return getMaxWrotePosition() - committedWhere;
}
/**
* 还有多少字节等待flush的
* 即flush与commit位置之差
*/
public long remainHowManyDataToFlush() {
return getMaxOffset() - flushedWhere;
}
getMappedFile相关
getMappedFileByTime
获取最后修改时间在timestamp之后的第一个mappedFile,没有的话就返回最后一个mappedFile
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
findMappedFileByOffset
通过offset找到所在的mappedFile
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
"mappedFileSize: {}, mappedFiles count: {}",
mappedFile,
offset,
index,
this.mappedFileSize,
this.mappedFiles.size());
}
try {
return this.mappedFiles.get(index);
} catch (Exception e) {
if (returnFirstOnNotFound) {
return mappedFile;//遇到异常,允许返回第一个
}
LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
public MappedFile findMappedFileByOffset(final long offset) {
return findMappedFileByOffset(offset, false);
}
getFirstMappedFile
返回mappedFiles队列中的第一个文件
public MappedFile getFirstMappedFile() {
MappedFile mappedFileFirst = null;
if (!this.mappedFiles.isEmpty()) {
try {
mappedFileFirst = this.mappedFiles.get(0);
} catch (IndexOutOfBoundsException e) {
//ignore
} catch (Exception e) {
log.error("getFirstMappedFile has exception.", e);
}
}
return mappedFileFirst;
}
getLastMappedFile
有几种形式
/**
* 返回mappedFiles中最后一个mappedFile,
* 如果mappedFiles为空,根据startOffset以及needCreate判断是否需要创建出来最新的mappedFile
* 如果mappedFiles最后一个写满了,根据needCreate判断是否需要创建出来最新的mappedFile
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);//创建nextFilePath,并将nextNextFilePath放入队列,以便异步处理
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);//同步创建
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);//队列中创建的第一个
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
/**
* 返回mappedFiles中最后一个mappedFile,
* 如果mappedFiles为空,根据startOffset创建出来最新的mappedFile
* 如果mappedFiles最后一个写满了,则创建出来最新的mappedFile
*/
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
//返回mappedFiles中最后一个mappedFile
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
reset相关
将offset以后的MappedFile都清除掉,但是代码似乎有bug,在吐槽中说
public boolean resetOffset(long offset) {
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast != null) {
long lastOffset = mappedFileLast.getFileFromOffset() +
mappedFileLast.getWrotePosition();
long diff = lastOffset - offset;//最后写到的位置 与 要求的offset之间的差距
final int maxDiff = this.mappedFileSize * 2;
if (diff > maxDiff)//如果超过了2个fileSize就失败
return false;
}
ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();//得到一个iter
while (iterator.hasPrevious()) {//这里似乎是bug,总会返回false,因为上一行cursor会是0
mappedFileLast = iterator.previous();
if (offset >= mappedFileLast.getFileFromOffset()) {
int where = (int) (offset % mappedFileLast.getFileSize());
mappedFileLast.setFlushedPosition(where);
mappedFileLast.setWrotePosition(where);
mappedFileLast.setCommittedPosition(where);
break;
} else {
iterator.remove();
}
}
return true;
}
清理
truncateDirtyFiles
/**
* 处理offset以上的MappedFile,认为是dirty的
* MappedFile包含了offset的去掉offset后续部分
* MappedFile超过了offset的直接删掉
*/
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//得到每个MappedFile结束时偏移
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {//包含有offset的文件,去掉后续尾巴
//更改file中的相对position位置
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {//文件开头就比offset大的,清除掉
file.destroy(1000);//关闭fileChannel,删除文件
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
deleteExpiredFile
从mappedFiles中删除对应的files记录
void deleteExpiredFile(List<MappedFile> files) {
if (!files.isEmpty()) {
Iterator<MappedFile> iterator = files.iterator();
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
if (!this.mappedFiles.contains(cur)) {
iterator.remove();
log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
}
}
try {
if (!this.mappedFiles.removeAll(files)) {
log.error("deleteExpiredFile remove failed.");
}
} catch (Exception e) {
log.error("deleteExpiredFile has exception.", e);
}
}
}
deleteLastMappedFile
删除,清理最后一个mappedFile
public void deleteLastMappedFile() {
MappedFile lastMappedFile = getLastMappedFile();
if (lastMappedFile != null) {
lastMappedFile.destroy(1000);
this.mappedFiles.remove(lastMappedFile);
log.info("on recover, destroy a logic mapped file " + lastMappedFile.getFileName());
}
}
deleteExpiredFileByTime
根据expiredTime删除过期文件,返回删除文件的数量
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);//转化mappedFiles成为 object[]数组
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {//过期了,或者立即清除
if (mappedFile.destroy(intervalForcibly)) {//在intervalForcibly时间内删除文件
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {//一次最多删除10个
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);//删除了一个文件之后,等待一段时间再删除下一个
} catch (InterruptedException e) {
}
}
} else {
break;
}
}
}
}
deleteExpiredFile(files);//从mappedFiles中删除记录
return deleteCount;
}
retryDeleteFirstFile
删除第一个文件
public boolean retryDeleteFirstFile(final long intervalForcibly) {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
if (!mappedFile.isAvailable()) {
log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
boolean result = mappedFile.destroy(intervalForcibly);
if (result) {
log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
tmpFiles.add(mappedFile);
this.deleteExpiredFile(tmpFiles);
} else {
log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
}
return result;
}
}
return false;
}
deleteExpiredFileByOffset
/**
* 这里假定每个mappedFile的存储单元都是uniteSize这么大,而且0,unitSize等这些位置,存储的是一个long offset值
* 在所有mappedFile中,找到最后一条存储单元记录(mappedFileSize - unitSize),读取long代表该mappedFile的maxOffsetInLogicQueue
* 如果 maxOffsetInLogicQueue < offset 就删除掉
*/
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);//截至最后一个unit部分
if (result != null) {
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();//该文件最大的offSet
result.release();
destroy = maxOffsetInLogicQueue < offset;//是否需要删除
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
deleteExpiredFile(files);//删除记录
return deleteCount;
}
flush 与 commit
/**
* 从上一次flush位置对应的MappedFile,进行flush
* 更新flushedWhere
* 返回false代表真正flush了,true代表没有变
*/
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);//已经刷到磁盘的位置,找到对应的mappedFile
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);//得到新的刷到的位置(相对该mappedFile的位置)
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;//如果flush成功了result会为false
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
/**
* 上一次的committedWhere对应的mappedFile执行commit
* 更新committedWhere
* result为false代表真正的commit了
*/
public boolean commit(final int commitLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);//执行commit之后,mappedFile记录的相对commit的位置
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;//false才代表commit执行了
this.committedWhere = where;
}
return result;
}
加载
load函数
/**
* 类似于恢复,重启时加载数据
* 读取storePath下面所有文件,对于大小是mappedFileSize记录在队列中
* 设置commit,wrote,flushPosition为mappedFileSize(类似于重启之后,标记这些位置都处理过)
*/
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;//大小不一样直接返回,一般在最后一个mappedFile,没有写完
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//前面存在的mappedFile是以前已经处理完的,这里再表识一下
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);//添加进mappedFiles队列
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
destroy
//清除所有mappedFiles
public void destroy() {
for (MappedFile mf : this.mappedFiles) {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
this.flushedWhere = 0;
// delete parent directory
File file = new File(storePath);
if (file.isDirectory()) {
file.delete();
}
}
其他
copyMappedFiles完成
/**
* 把mappedFiles 转成Object[]
* 如果队列大小 <= reservedMappedFiles, 那么就返回null
* 调用方的reservedMappedFiles都是0
*/
private Object[] copyMappedFiles(final int reservedMappedFiles) {
Object[] mfs;
if (this.mappedFiles.size() <= reservedMappedFiles) {
return null;
}
mfs = this.mappedFiles.toArray();
return mfs;
}
还有一些没有调用到的函数,就没有讲
思考
remainHowManyDataToCommit与remainHowManyDataToFlush函数区别
remainHowManyDataToCommit:最新的MappedFile的写到的位置 与 当前commit位置的差距
remainHowManyDataToFlush:最新的MappedFile的commit的位置 与 当前flush位置的差距
即write >= commit >= flush位置
truncateDirtyFiles时,包含有指定offset的mappedFile后续记录是如何清除的
通过指定wrote,commit,flush的position即可
问题
暂时不知道为什么有这么多函数,哪些函数是重要的
吐槽
copyMappedFiles的参数
让人莫名奇妙,好在调用方都是0
resetOffset的执行
代码中
ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();//得到一个iter
参考实现,在执行while (iterator.hasPrevious()) 一定会返回false的不会为true
我跑去提了issue
https://issues.apache.org/jira/browse/ROCKETMQ-303
refer
http://ifeve.com/java-copy-on-write/ CopyOnWriteArrayList