

2019-03-25  本文已影响4人  扫帚的影子


│ Header │               Blocks               │    Index    │    Footer    │
│5 bytes │              N bytes               │   N bytes   │   4 bytes    │
│      Header       │
│  Magic  │ Version │
│ 4 bytes │ 1 byte  │
│                          Blocks                           │
│      Block 1      │      Block 2      │      Block N      │
│  CRC    │  Data   │  CRC    │  Data   │  CRC    │  Data   │
│ 4 bytes │ N bytes │ 4 bytes │ N bytes │ 4 bytes │ N bytes │
│                                   Index                                    │
│ Key Len │   Key   │ Type │ Count │Min Time │Max Time │ Offset │  Size  │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │   │
│ Key Len │   Key   │ Type │ Count │Min Time │Max Time │ Offset │  Size  │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │   │
│ Footer  │
│Index Ofs│
│ 8 bytes │


type IndexEntry struct {
    // The min and max time of all points stored in the block.
    MinTime, MaxTime int64

    // The absolute position in the file where this block is located.
    Offset int64

    // The size in bytes of the block in the file.
    Size uint32


func (e *IndexEntry) AppendTo(b []byte) []byte {
    if len(b) < indexEntrySize {
        if cap(b) < indexEntrySize {
            b = make([]byte, indexEntrySize)
        } else {
            b = b[:indexEntrySize]

    binary.BigEndian.PutUint64(b[:8], uint64(e.MinTime))
    binary.BigEndian.PutUint64(b[8:16], uint64(e.MaxTime))
    binary.BigEndian.PutUint64(b[16:24], uint64(e.Offset))
    binary.BigEndian.PutUint32(b[24:28], uint32(e.Size))

    return b
func (e *IndexEntry) UnmarshalBinary(b []byte) error {
    if len(b) < indexEntrySize {
        return fmt.Errorf("unmarshalBinary: short buf: %v < %v", len(b), indexEntrySize)
    e.MinTime = int64(binary.BigEndian.Uint64(b[:8]))
    e.MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))
    e.Offset = int64(binary.BigEndian.Uint64(b[16:24]))
    e.Size = binary.BigEndian.Uint32(b[24:28])
    return nil
type directIndex struct {
    keyCount int
    size     uint32

    // The bytes written count of when we last fsync'd
    lastSync uint32
    fd       *os.File       // 缓存到磁盘文件
    buf      *bytes.Buffer  // 缓存到内存Buffer

    f syncer

    w *bufio.Writer

    // 下面两项合起来代表一个子index
    key          []byte
    indexEntries *indexEntries  


  1. 创建基于内存的缓存:
func NewIndexWriter() IndexWriter {
   buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
   return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
  1. 创建基于Disk的缓存:
func NewDiskIndexWriter(f *os.File) IndexWriter {
   return &directIndex{fd: f, w: bufio.NewWriterSize(f, 1024*1024)}
func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
    // d.key为空时,写入这个新的key的length, key value等信息,相当于是一个新的子index的开始
    if len(d.key) == 0 {
        // size of the key stored in the index
        d.size += uint32(2 + len(key))
        // size of the count of entries stored in the index
        d.size += indexCountSize

        d.key = key
        if d.indexEntries == nil {
            d.indexEntries = &indexEntries{}
        d.indexEntries.Type = blockType
        d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
            MinTime: minTime,
            MaxTime: maxTime,
            Offset:  offset,
            Size:    size,

        // 记录index总的大小和不同key的个数的
        d.size += indexEntrySize

    // See if were still adding to the same series key.
    cmp := bytes.Compare(d.key, key)
    if cmp == 0 {
        // key相同说明当前子index的添加还没有结束
        d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
            MinTime: minTime,
            MaxTime: maxTime,
            Offset:  offset,
            Size:    size,

        // size of the encoded index entry
        d.size += indexEntrySize

    } else if cmp < 0 {
        // We have a new key that is greater than the last one so we need to add
        // a new index block section.

        // size of the key stored in the index
        d.size += uint32(2 + len(key))
        // size of the count of entries stored in the index
        d.size += indexCountSize

        d.key = key
        d.indexEntries.Type = blockType
        d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
            MinTime: minTime,
            MaxTime: maxTime,
            Offset:  offset,
            Size:    size,

        // size of the encoded index entry
        d.size += indexEntrySize
    } else {
        // Keys can't be added out of order.
        panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.key)))
func (d *directIndex) flush(w io.Writer) (int64, error) {
    var (
        n   int
        err error
        buf [5]byte
        N   int64

    if len(d.key) == 0 {
        return 0, nil
    // For each key, individual entries are sorted by time
    key := d.key
    entries := d.indexEntries

    if entries.Len() > maxIndexEntries {
        return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)

    // 将IndexEntry按MinTime来排序
    if !sort.IsSorted(entries) {

    binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
    buf[2] = entries.Type
    binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len()))

    // 写入key length
    if n, err = w.Write(buf[0:2]); err != nil {
        return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err)
    N += int64(n)

    // 写入key
    if n, err = w.Write(key); err != nil {
        return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
    N += int64(n)

    // 写入类型和IndexEntry个数
    if n, err = w.Write(buf[2:5]); err != nil {
        return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err)
    N += int64(n)

    // 写每一个IndexEntry
    var n64 int64
    if n64, err = entries.WriteTo(w); err != nil {
        return n64 + N, fmt.Errorf("write: writer entries error: %v", err)
    N += n64

    d.key = nil
    d.indexEntries.Type = 0
    d.indexEntries.entries = d.indexEntries.entries[:0]

    // 如果是基于磁盘文件的缓存,达到阈值后,Sync到磁盘文件 
    if d.fd != nil && d.size-d.lastSync > fsyncEvery {
        if err := d.fd.Sync(); err != nil {
            return N, err
        d.lastSync = d.size

    return N, nil
func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
    // 先flush到d.w中
    if _, err := d.flush(d.w); err != nil {
        return 0, err

    // 再从d.w中flush到内存buffer或文件
    if err := d.w.Flush(); err != nil {
        return 0, err

    if d.fd == nil {
        // 如果是缓存到内存,两个buffer作对拷
        return copyBuffer(d.f, w, d.buf, nil)

    // 如果是缓存到磁盘,读磁盘文件拷贝到buffer,这个地方是否需要调用d.fd.Sync()?
    if _, err := d.fd.Seek(0, io.SeekStart); err != nil {
        return 0, err

    return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
type tsmWriter struct {
    wrapped io.Writer
    w       *bufio.Writer
    index   IndexWriter
    n       int64

    // The bytes written count of when we last fsync'd
    lastSync int64
func (t *tsmWriter) Write(key []byte, values Values) error {
    if len(key) > maxKeyLength {
        return ErrMaxKeyLengthExceeded

    // Nothing to write
    if len(values) == 0 {
        return nil

    // Write header only after we have some data to write.
    if t.n == 0 {
        // 先写头
        if err := t.writeHeader(); err != nil {
            return err

    // 将vluaes编码成Block
    block, err := values.Encode(nil)
    if err != nil {
        return err

    blockType, err := BlockType(block)
    if err != nil {
        return err

    // 计算crc
    var checksum [crc32.Size]byte
    binary.BigEndian.PutUint32(checksum[:], crc32.ChecksumIEEE(block))

    // 写入crc
    _, err = t.w.Write(checksum[:])
    if err != nil {
        return err

    // 写入block
    n, err := t.w.Write(block)
    if err != nil {
        return err
    n += len(checksum)

    // 写IndexEntry到Index缓存
    t.index.Add(key, blockType, values[0].UnixNano(), values[len(values)-1].UnixNano(), t.n, uint32(n))

    // Increment file position pointer
    t.n += int64(n)

    if len(t.index.Entries(key)) >= maxIndexEntries {
        return ErrMaxBlocksExceeded

    return nil
func (t *tsmWriter) WriteIndex() error {
    indexPos := t.n

    if t.index.KeyCount() == 0 {
        return ErrNoValues

    // Set the destination file on the index so we can periodically
    // fsync while writing the index.
    if f, ok := t.wrapped.(syncer); ok {
        t.index.(*directIndex).f = f

    // Write the index
    if _, err := t.index.WriteTo(t.w); err != nil {
        return err

    var buf [8]byte
    binary.BigEndian.PutUint64(buf[:], uint64(indexPos))

    // 写footer, 记录index在文件中的offset
    _, err := t.w.Write(buf[:])
    return err


type mmapAccessor struct {
    // 下面这两个相当于引用计数
    accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
    freeCount   uint64 // Counter to determine whether the accessor can free its resources

    // mmap只是作了虚拟内存到磁盘文件地址的映射,建立了相应的页面,只有真正访问时,才从磁盘读入内存,如果这个标识为true,则在
    // 建立了映射后,使用advise系统调用建议os立即读到内存
    mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.

    mu sync.RWMutex
    b  []byte
    f  *os.File

    // tsm中的索引部分使用这个indirectIndex访问
    index *indirectIndex
func (m *mmapAccessor) init() (*indirectIndex, error) {
    defer m.mu.Unlock()


    // 内存映射当前的tsm文件
    m.b, err = mmap(m.f, 0, int(stat.Size()))
    if err != nil {
        return nil, err

    // footer
    if len(m.b) < 8 {
        return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")

    // Hint to the kernel that we will be reading the file.  It would be better to hint
    // that we will be reading the index section, but that's not been
    // implemented as yet.
    // 这个在上面的定义中已经作了解释
    if m.mmapWillNeed {
        if err := madviseWillNeed(m.b); err != nil {
            return nil, err

    // 读取tsm文件中的footer部分,获取index部分的偏移量
    indexOfsPos := len(m.b) - 8
    indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
    if indexStart >= uint64(indexOfsPos) {
        return nil, fmt.Errorf("mmapAccessor: invalid indexStart")

    // 创建indirectIndex对象(这个对象我们后面会专门介绍),读取并解析tsm中的index部分,获取到最小key,最大key,这里的key = series key + field
    // 当前tsm中最小时间戳和最大时间戳
    m.index = NewIndirectIndex()
    if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
        return nil, err

    // Allow resources to be freed immediately if requested
    atomic.StoreUint64(&m.freeCount, 1)

    return m.index, nil
func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {

    defer m.mu.RUnlock()

    if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
        return nil, ErrTSMClosed
    //TODO: Validate checksum
    var err error
    // 解析Datablock
    values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
    if err != nil {
        return nil, err

    return values, nil
func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {

    if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
        return 0, nil, ErrTSMClosed

    // return the bytes after the 4 byte checksum
    crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)]

    return crc, block, nil
func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {

    blocks := m.index.Entries(key)
    if len(blocks) == 0 {
        return nil, nil

    // 获取当前key对应的已被标记为删除的时间段
    tombstones := m.index.TombstoneRange(key)

    defer m.mu.RUnlock()

    var temp []Value
    var err error
    var values []Value
    for _, block := range blocks {
        var skip bool
        // 如果当前block的[minTimem, maxTime]是tombstones中的某个TimeRange的子集,表时当前这个block已可以全部被删除,不需要再往下处理了
        for _, t := range tombstones {
            // Should we skip this block because it contains points that have been deleted
            if t.Min <= block.MinTime && t.Max >= block.MaxTime {
                skip = true

        if skip {
        //TODO: Validate checksum
        temp = temp[:0]
        // The +4 is the 4 byte checksum length
        // 解析当前的datablock, 获取包含的所有Value,每个Value里包括真实值和时间戳两部分
        temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp)
        if err != nil {
            return nil, err

        // Filter out any values that were deleted
        // 再次判断时间戳,看是否已经被标记删除
        for _, t := range tombstones {
            temp = Values(temp).Exclude(t.Min, t.Max)

        values = append(values, temp...)

    return values, nil
type indirectIndex struct {
    mu sync.RWMutex
        b []byte

    // offsets contains the positions in b for each key.  It points to the 2 byte length of
    // key.
    // 记录了每个子index在tsm文件中的偏移量,便于快速访问某一个子index
    offsets []byte

    // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
    // file
    minKey, maxKey []byte

    // minTime, maxTime are the minimum and maximum times contained in the file across all
    // series.
    minTime, maxTime int64

    // tombstones contains only the tombstoned keys with subset of time values deleted.  An
    // entry would exist here if a subset of the points for a key were deleted and the file
    // had not be re-compacted to remove the points on disk.
    tombstones map[string][]TimeRange
func (d *indirectIndex) offset(i int) int {
    if i < 0 || i+4 > len(d.offsets) {
        return -1
    return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4]))
func (d *indirectIndex) searchOffset(key []byte) int {
    // 二分法查找
    i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
        // i is the position in offsets we are at so get offset it points to
        offset := int32(binary.BigEndian.Uint32(x))

        // It's pointing to the start of the key which is a 2 byte length
        keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))

        // See if it matches
        return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0

    // See if we might have found the right index
    if i < len(d.offsets) {
        return int(i / 4)

    // The key is not in the index.  i is the index where it would be inserted so return
    // a value outside our offset range.
    return int(len(d.offsets)) / 4
func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
    defer d.mu.RUnlock()

    ofs := d.search(key)
    if ofs < len(d.b) {
        k, entries := d.readEntriesAt(ofs, entries)
        // The search may have returned an i == 0 which could indicated that the value
        // searched should be inserted at position 0.  Make sure the key in the index
        // matches the search value.
        if !bytes.Equal(key, k) {
            return nil

        return entries

    // The key is not in the index.  i is the index where it would be inserted.
    return nil
func (d *indirectIndex) Delete(keys [][]byte) {
    if len(keys) == 0 {

    if !bytesutil.IsSorted(keys) {

    // Both keys and offsets are sorted.  Walk both in order and skip
    // any keys that exist in both.
    start := d.searchOffset(keys[0])
    for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 {
        offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
        _, indexKey := readKey(d.b[offset:])

        for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
            keys = keys[1:]

        if len(keys) > 0 && bytes.Equal(keys[0], indexKey) {
            keys = keys[1:]
            // nilOffset是4个byte, 每个byte是255
            // 将这个位置的offset标识4个255,标识当前key是删除
            copy(d.offsets[i:i+4], nilOffset[:])
    d.offsets = bytesutil.Pack(d.offsets, 4, 255)
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
    // 如果给定的时间戳[minTime, maxTime]包括了所有的时间跨度,那就把所有的key都删除
    if minTime == math.MinInt64 && maxTime == math.MaxInt64 {

    // 指定的时间戳范围和indirectIndex的时间戳范围没交集,那就没什么可删的
    min, max := d.TimeRange()
    if minTime > max || maxTime < min {

    // fullKeys用来存放key对应的datablock需要全部删除的key的集合
    fullKeys := make([][]byte, 0, len(keys))
    // tombstones用来存放key对应的Value需要部分删除的key对TimeRange列表的
    tombstones := map[string][]TimeRange{}
    var ie []IndexEntry

    for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ {
        k, entries := d.readEntriesAt(d.offset(i), &ie)

        // Skip any keys that don't exist.  These are less than the current key.
        for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 {
            keys = keys[1:]

        // No more keys to delete, we're done.
        if len(keys) == 0 {

        // If the current key is greater than the index one, continue to the next
        // index key.
        if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 {

        // If multiple tombstones are saved for the same key
        if len(entries) == 0 {

        // 指定要删除的时间戳范围和当前key的时间戳范围没交集,换下一个
        min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
        if minTime > max || maxTime < min {

        // Does the range passed in cover every value for the key?
        // 当前key的时间戳范围是给定的需要删除的时间戳范围的子集,那全部都需要删除
        if minTime <= min && maxTime >= max {
            fullKeys = append(fullKeys, keys[0])
            keys = keys[1:]

        existing := d.tombstones[string(k)]

        // 部分value在给定的时间戳范围内,需要删除
        newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...)
        fn := func(i, j int) bool {
            a, b := newTs[i], newTs[j]
            if a.Min == b.Min {
                return a.Max <= b.Max
            return a.Min < b.Min

        // Sort the updated tombstones if necessary
        if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) {
            sort.Slice(newTs, fn)

        tombstones[string(k)] = newTs

        minTs, maxTs := newTs[0].Min, newTs[0].Max
        for j := 1; j < len(newTs); j++ {
            prevTs := newTs[j-1]
            ts := newTs[j]

            // Make sure all the tombstone line up for a continuous range.  We don't
            // want to have two small deletes on each edges end up causing us to
            // remove the full key.
            if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) {
                minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64)

            if ts.Min < minTs {
                minTs = ts.Min
            if ts.Max > maxTs {
                maxTs = ts.Max

        // If we have a fully deleted series, delete it all of it.
        if minTs <= min && maxTs >= max {
            fullKeys = append(fullKeys, keys[0])
            keys = keys[1:]

    // Delete all the keys that fully deleted in bulk
    if len(fullKeys) > 0 {

    if len(tombstones) == 0 {

    for k, v := range tombstones {
        d.tombstones[k] = v
type TSMReader struct {
    // refs is the count of active references to this reader.
    refs   int64
    refsWG sync.WaitGroup

    madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED.
    mu              sync.RWMutex

    // accessor provides access and decoding of blocks for the reader.
    // 上面介绍过,mmap方式读tsm文件
    accessor blockAccessor

    // index is the index of all blocks.
    // TSMIndex是interface, 这里就是用的上面介绍过的indirectIndex
    index TSMIndex

    // tombstoner ensures tombstoned keys are not available by the index.
    // 在tombstoner中的都是被删除的,读的时候就不读了;
    // indirectIndex根据这个tomstoner作DeleteRagne操作
    tombstoner *Tombstoner

    // size is the size of the file on disk.
    size int64

    // lastModified is the last time this file was modified on disk
    lastModified int64

    // deleteMu limits concurrent deletes
    deleteMu sync.Mutex
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
    t := &TSMReader{}
    for _, option := range options {

    stat, err := f.Stat()
    if err != nil {
        return nil, err
    // 获取tsm文件基本信息
    t.size = stat.Size()
    t.lastModified = stat.ModTime().UnixNano()
    // 创建 mmapAccessor,用内存映射方式来访问tsm文件
    t.accessor = &mmapAccessor{
        f:            f,
        mmapWillNeed: t.madviseWillNeed,

    // 读取并解析tsm文件的index部分
    index, err := t.accessor.init()
    if err != nil {
        return nil, err

    t.index = index
    // 创建 tombstoner, 从tombstone文件中获取已被标记为删除的key对应的time range信息
    t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)

    // 根据tombstone文件内容从index中剔除被删除的内容
    if err := t.applyTombstones(); err != nil {
        return nil, err

    return t, nil


type FileStore struct {
    mu           sync.RWMutex
    lastModified time.Time //被管理的所有TSM文件最后被修改的时间 
    // Most recently known file stats. If nil then stats will need to be
    // recalculated
    lastFileStats []FileStat // 所有被管理的TSM文件的状态

    currentGeneration int
    dir               string // TSM文件所在的目录

    files           []TSMFile //被管理的所有的TSM文件,即TSMReader
    tsmMMAPWillNeed bool          // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
    openLimiter     limiter.Fixed // limit the number of concurrent opening TSM files.

    logger       *zap.Logger // Logger to be used for important messages
    traceLogger  *zap.Logger // Logger to be used when trace-logging is on.
    traceLogging bool

    stats  *FileStoreStatistics
    purger *purger //TSM文件的延迟删除,有可能在删除时文件处于被使用状态,然后将其加入到这个purger中

    currentTempDirID int

    parseFileName ParseFileNameFunc

    obs tsdb.FileStoreObserver
// Open loads all the TSM files in the configured directory.
func (f *FileStore) Open() error {
    // 获取到目录下所有的.tsm文件
    files, err := filepath.Glob(filepath.Join(f.dir, fmt.Sprintf("*.%s", TSMFileExtension)))
    if err != nil {
        return err

    // struct to hold the result of opening each reader in a goroutine
    type res struct {
        r   *TSMReader
        err error

    // 创建Channel, 用来通知TSM加载的结果
    readerC := make(chan *res)
    for i, fn := range files {
        // Keep track of the latest ID
        generation, _, err := f.parseFileName(fn)
        if err != nil {
            return err

        if generation >= f.currentGeneration {
            f.currentGeneration = generation + 1

        // 打开当前的TSM文件
        file, err := os.OpenFile(fn, os.O_RDONLY, 0666)
        if err != nil {
            return fmt.Errorf("error opening file %s: %v", fn, err)

        // 每个TSM文件使用一个单独的goroutine来中载
        go func(idx int, file *os.File) {
            // 限制同时被加载TSM文件的个数,避免资源的过多占用
            defer f.openLimiter.Release()

            start := time.Now()
            // 针对当前的TSM文件创建TSMReader,用来读TSM文件 
            df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))

            readerC <- &res{r: df}
        }(i, file)

    var lm int64
    for range files {
        res := <-readerC
        if res.err != nil {
            return res.err
        } else if res.r == nil {
        // 加载成功的TSM文件加入到f.files中
        f.files = append(f.files, res.r)
    f.lastModified = time.Unix(0, lm).UTC()

    // 按文件名排序
    atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
    return nil
func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) error {

    // 在每个TSM文件里搜索key大小等于seek的所有key
    // 调用的是inderictIndex.searchOffset方法,这个方法如果当前tsm文件里不包含这个seek,那就返回这个tsm文件里最大的一个key, 这有点不合理啊~~~
    ki := newMergeKeyIterator(f.files, seek)
    for ki.Next() {
        key, typ := ki.Read()
        if err := fn(key, typ); err != nil {
            return err

    return nil
// be used with smaller batches of series keys.
func (f *FileStore) DeleteRange(keys [][]byte, min, max int64) error {
    var batches BatchDeleters
    //遍历所有的TSMReader, 如果当前文件有需要删除的内容,就生成一个batchdeleter
    for _, f := range f.files {
        if f.OverlapsTimeRange(min, max) {
            batches = append(batches, f.BatchDelete())

    if len(batches) == 0 {
        return nil

    if err := func() error {
        if err := batches.DeleteRange(keys, min, max); err != nil {
            return err

        // Commit成功后就会写入到tombstone文件中
        return batches.Commit()
    }(); err != nil {
        // Rollback the deletes
        _ = batches.Rollback()
        return err

    f.lastModified = time.Now().UTC()
    f.lastFileStats = nil
    return nil
func (f *FileStore) Read(key []byte, t int64) ([]Value, error) {
    defer f.mu.RUnlock()

    for _, f := range f.files {
        // Can this file possibly contain this key and timestamp?
        if !f.Contains(key) {

        // May have the key and time we are looking for so try to find
        v, err := f.Read(key, t)
        if err != nil {
            return nil, err

        // 遍历所有TSM文件读,已读到一个后,就不再遍历后续的TSM文件 
        if len(v) > 0 {
            return v, nil
    return nil, nil
type location struct {
    r     TSMFile  // 属于哪个TSM文件 
    entry IndexEntry //封装的是哪个IndexEntry

    readMin, readMax int64 // 一个IndexEntry包含了一段时间范围,这两个值表示已经读过了这个IndexEntry中的哪段时间的数据

func (l *location) read() bool {
    return l.readMin <= l.entry.MinTime && l.readMax >= l.entry.MaxTime

我们看下 locations的实现:

func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
    var cache []IndexEntry
    locations := make([]*location, 0, len(f.files))
    for _, fd := range f.files {
        minTime, maxTime := fd.TimeRange()

        // 先根据当前TSM文件覆盖的时间范围过滤
        if ascending && maxTime < t {
        } else if !ascending && minTime > t {
        tombstones := fd.TombstoneRange(key)

        // 过滤所有的index entry
        entries := fd.ReadEntries(key, &cache)
        for i := 0; i < len(entries); i++ {
            ie := entries[i]

            // 过滤掉已经被标记删除的
            for _, t := range tombstones {
                if t.Min <= ie.MinTime && t.Max >= ie.MaxTime {
                    continue LOOP

            if ascending && ie.MaxTime < t {
                // If we descending and the min time of a block is after where we are looking, skip
                // it since the data is out of our range
            } else if !ascending && ie.MinTime > t {

            // 生成location
            location := &location{
                r:     fd,
                entry: ie,

            // 初始化已读时间戳范围
            if ascending {
                location.readMin = math.MinInt64
                location.readMax = t - 1
            } else {
                location.readMin = t + 1
                location.readMax = math.MaxInt64
            // Otherwise, add this file and block location
            locations = append(locations, location)
    return locations



type KeyCursor struct {
    key []byte

    // seeks is all the file locations that we need to return during iteration.
    seeks []*location

    // current is the set of blocks possibly containing the next set of points.
    // Normally this is just one entry, but there may be multiple if points have
    // been overwritten.
    current []*location
    buf     []Value

    ctx context.Context
    col *metrics.Group

    // pos is the index within seeks.  Based on ascending, it will increment or
    // decrement through the size of seeks slice.
    pos       int
    ascending bool
上一篇 下一篇

