Java 杂谈程序员阿里云

rocketmq存储

2019-03-18  本文已影响3人  WANGGGGG

因为业务需要,所以学习了两天相关的源码,当有同事问我相关存储的机制时,我写了这个文档
因为是给同行看的,所以这里就直接开始看源码(大篇幅的贴源代码看着会有点费劲,如果有兴趣可以根据方法名去github上下载源代码来看)

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {     
………........……….
调用mapedFile,写文件
 result = mapedFile.appendMessage(msg, this.appendMessageCallback);
…………....……………….
}

public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
}

public AppendMessageResult doAppend(final long fileFromOffset,
                     final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
     // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
}

msgStoreItemMemory是一个内存映射文件MappedByteBuffer
将filechannle中的全部或者部分映射到内存中
MappedByteBuffer mbb = fc.map( FileChannel.MapMode.READ_WRITE, 0, 1024 );
好了,方法调用到这里你可以发现,消息已经被存储到了内存中,还没有被持久化,所以我们继续研究。
消息持久化呢有两种方式,同步刷盘和异步刷盘
我们直接来看源码

if (FlushDiskType.SYNC_FLUSH== this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
      GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
          …….
        启用同步刷盘线程进行刷盘
                service.wakeup();
        …….
        // Asynchronous flush
        else {
        启用flushCommitLogService线程,进行异步刷盘喽
            this.flushCommitLogService.wakeup();
 }

下面先讲解一下同步刷盘

GroupCommitService service = (GroupCommitService) this.flushCommitLogService
private void doCommit() {
        ……..
       线程调用这个方法将文件持久化写入硬盘中
       CommitLog.this.mapedFileQueue.commit(0);
       …………
}

CommitLog初始化

this.mapedFileQueue = new MapedFileQueue(
           defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
           defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), 
           defaultMessageStore.getAllocateMapedFileService());
说白了就是
this.mapedFileQueue = new MapedFileQueue(文件路径,文件大小,存文件的实际服务,叫它方法也行)
看完初始化,揭开commit的神秘面纱

public boolean commit(final int flushLeastPages) {
        MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);
            int offset = mapedFile.commit(flushLeastPages);
}

到这里我们还没有看到MapedFile没有初始化,它在MapedFileQueue中的load方法中初始化

public boolean load() {
  MapedFile mapedFile = new MapedFile(file.getPath(), mapedFileSize);
}

  //刷盘,内存到硬盘                          
public int commit(final int flushLeastPages) {
    this.mappedByteBuffer.force();
}

但是同步刷盘会影响我们消息收发的整体性能,你想一下,如果你发一条数据,就调用io写入硬盘,当然只有一条数据,确实没问题,但是数量大的时候,开启大量线程,或者线程池中的线程被占用来写数据了,那肯定会影响它收发消息了,毕竟系统的整体性能是固定的。

这个时候就要提到异步刷盘的概念了,即收集一批数据后在写入硬盘中,可以极大的降低所消耗的性能。那么什么时候进行异步刷盘呢?异步刷盘又分为定时刷盘和实时刷盘,一般默认的是实时刷盘,接下展示rocketmq的刷盘机制。

//是否定时方式刷盘,默认是实时刷盘real time
  boolean flushCommitLogTimed =
  CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
 //CommitLog刷盘间隔时间 default 1s
  int interval =
  CommitLog.this.defaultMessageStore.getMessageStoreConfig() .getFlushIntervalCommitLog();
 //刷CommitLog,至少刷几个PAGE default 4
 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig()
                            .getFlushCommitLogLeastPages();
 //刷CommitLog,彻底刷盘间隔时间  default 10s
 int flushPhysicQueueThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
            …………
             进行刷盘
            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
            …………
接下来就和同步刷盘方式一致了,都是写入硬盘的操作了

删除文件机制

deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
        destroyMapedFileIntervalForcibly, cleanAtOnce);
public int deleteExpiredFile(
                     final long expiredTime, 
                     final int deleteFilesInterval, 
                     final long intervalForcibly, 
                     final boolean cleanImmediately
) {
    return this.mapedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
public int deleteExpiredFileByTime(
                                  final long expiredTime, 
                                  final int deleteFilesInterval, 
                                  final long intervalForcibly,
                                  final boolean cleanImmediately
) {
    Object[] mfs = this.copyMapedFiles(0);
    if (null == mfs)
        return 0;
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    List files = new ArrayList();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MapedFile mapedFile = (MapedFile) mfs[i];
            long liveMaxTimestamp = mapedFile.getLastModifiedTimestamp() + expiredTime;
            if (System.currentTimeMillis() >= liveMaxTimestamp
                    || cleanImmediately) {
                if (mapedFile.destroy(intervalForcibly)) {
                    files.add(mapedFile);
                    deleteCount++;
                    if (files.size() >= DeleteFilesBatchMax) {
                        break;
                    }
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            }
        }
    }
    deleteExpiredFile(files);
    return deleteCount;
}
上一篇 下一篇

猜你喜欢

热点阅读