大数据,机器学习,人工智能大数据玩转大数据

Influxdb中TSM文件结构解析之WAL

2019-01-03  本文已影响0人  扫帚的影子

存储在Influxdb中的数据类型

存储每条数据时的时间戳类型
Field字段的类型
Field字段的类型在源码中对应类型

对应的类型是Value,这是个interface,定义在tsdb/engine/tsm1/encoding.go

type FloatValue struct {
    unixnano int64
    value    float64
}
编解码

每种类型在存储时都需要作编码,尽可能地作压缩,所有针对各个类型均提供了Encoder和Decoder。
这些Encoder负责将一组相同类型的Value作压缩编码,具体的编码算法我们这里不再展开。
我们针对FloatValue作一下分析encodeFloatBlockUsing
参数中values []Value就是一系列的FloatValue`,不仅包括Float值,还包括对应的时间戳,都需要被编码

func encodeFloatBlockUsing(buf []byte, values []Value, tsenc TimeEncoder, venc *FloatEncoder) ([]byte, error) {
    tsenc.Reset()
    venc.Reset()

    for _, v := range values {
        vv := v.(FloatValue)
        tsenc.Write(vv.unixnano) //使用TimeEncoder编码每个时间戳
        venc.Write(vv.value) //使用FloatEncoder编码每个Float值
    }
    venc.Flush()

    // Encoded timestamp values
    tb, err := tsenc.Bytes()
    if err != nil {
        return nil, err
    }
    // Encoded float values
    vb, err := venc.Bytes()
    if err != nil {
        return nil, err
    }

    // Prepend the first timestamp of the block in the first 8 bytes and the block
    // in the next byte, followed by the block
    // 将这一组FloatValue打包到一个Block
    return packBlock(buf, BlockFloat64, tb, vb), nil
}
打包到DataBlock

DataBlock是写入和读取TSM文件的最小单位,每个DataBlock里存储的都是同样类型的Value,每个DataBlock里的Value对应都是同一个写入的Key,这个Key是series key + field;
Influxdb算是列存储,在这里所有的Value是连续存在一起,这些Value对应的时间戳也是连续存在一起,这样更有利于作压缩

influxdb_data_block.png

这个结构中并没有记录Values部分的长度,这是因为我们记录了时间戳部分的总长,在解析时间戳部分时候我们可以得知有几个时间戳,也就知道了有几个Value。

我们来看一下打包过程,结合上面的结构图,这个过程就很简单了:

func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte {
    // We encode the length of the timestamp block using a variable byte encoding.
    // This allows small byte slices to take up 1 byte while larger ones use 2 or more.
    sz := 1 + binary.MaxVarintLen64 + len(ts) + len(values)
    if cap(buf) < sz {
        buf = make([]byte, sz)
    }
    b := buf[:sz]
    b[0] = typ
    i := binary.PutUvarint(b[1:1+binary.MaxVarintLen64], uint64(len(ts)))
    i += 1

    // block is <len timestamp bytes>, <ts bytes>, <value bytes>
    copy(b[i:], ts)
    // We don't encode the value length because we know it's the rest of the block after
    // the timestamp block.
    copy(b[i+len(ts):], values)
    return b[:i+len(ts)+len(values)]
}
解包DataBlock

我们还以FloatValue为例

func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
    // Block type is the next block, make sure we actually have a float block
    blockType := block[0]
    if blockType != BlockFloat64 {
        return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType)
    }
    
    // 跳过1字节的block type
    block = block[1:]

    tb, vb, err := unpackBlock(block)
    if err != nil {
        return nil, err
    }

    //计算有多少组Value
    sz := CountTimestamps(tb)

    if cap(*a) < sz {
        *a = make([]FloatValue, sz)
    } else {
        *a = (*a)[:sz]
    }

    tdec := timeDecoderPool.Get(0).(*TimeDecoder)
    vdec := floatDecoderPool.Get(0).(*FloatDecoder)

    var i int
    err = func(a []FloatValue) error {
        // Setup our timestamp and value decoders
        tdec.Init(tb)
        err = vdec.SetBytes(vb)
        if err != nil {
            return err
        }

        // Decode both a timestamp and value
        j := 0
        for j < len(a) && tdec.Next() && vdec.Next() {
            a[j] = FloatValue{unixnano: tdec.Read(), value: vdec.Values()}
            j++
        }
        i = j

        // Did timestamp decoding have an error?
        err = tdec.Error()
        if err != nil {
            return err
        }

        // Did float decoding have an error?
        return vdec.Error()
    }(*a)
    
        timeDecoderPool.Put(tdec)
    floatDecoderPool.Put(vdec)

    return (*a)[:i], err
Dabablock的其他操作
func BlockType(block []byte) (byte, error) {
    blockType := block[0]
    switch blockType {
    case BlockFloat64, BlockInteger, BlockUnsigned, BlockBoolean, BlockString:
        return blockType, nil
    default:
        return 0, fmt.Errorf("unknown block type: %d", blockType)
    }
}
func BlockCount(block []byte) int {
    if len(block) <= encodedBlockHeaderSize {
        panic(fmt.Sprintf("count of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
    }
    // first byte is the block type
    tb, _, err := unpackBlock(block[1:])
    if err != nil {
        panic(fmt.Sprintf("BlockCount: error unpacking block: %s", err.Error()))
    }
    return CountTimestamps(tb)
}
func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
    if len(block) <= encodedBlockHeaderSize {
        panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
    }

    blockType, err := BlockType(block)
    if err != nil {
        return nil, err
    }

    switch blockType {
    case BlockFloat64:
        var buf []FloatValue
        decoded, err := DecodeFloatBlock(block, &buf)
        if len(vals) < len(decoded) {
            vals = make([]Value, len(decoded))
        }
        for i := range decoded {
            vals[i] = decoded[i]
        }
        return vals[:len(decoded)], err
    case BlockInteger:
        ...
    case BlockUnsigned:
        ...
    case BlockBoolean:
        ...
    case BlockString:
        ...
    default:
        panic(fmt.Sprintf("unknown block type: %d", blockType))
    }
}

WALEntry

  1. WAL在写入TSM文件时用作预写日志。
  2. 每个DB的每个RetentionPolicy下面的每个Shard下都有自己的一个单独的WAL文件目录,Influxdb在启动的配置文件中需设置单独的WAL目录,来存储所有Shard的WAL文件。
  3. 每个Shard都对应一个WAL目录,目录下有多个wal文件,每个称作一个WALSegment,默认大小是10M。文件命名规则是,以_开头,中间是ID,扩展名是wal, 比如 _00001.wal
  4. 每次写入WAL的内容称为一个WALEntry, 在写入和读取这个Entry时需要序列化和反序列化,我们先来看一下其定义:
type WALEntry interface {
    Type() WalEntryType  // Entry的类型: WriteWALEntry, DeleteWALEntry, DeleteRangeWALEntry
    Encode(dst []byte) ([]byte, error)
    MarshalBinary() ([]byte, error) //使用上面的Encode方法作序列化
    UnmarshalBinary(b []byte) error //反序列化
    MarshalSize() int
}

我们下面来分析一下具体的三种WALEntry

WriteWALEntry
type WriteWALEntry struct {
    Values map[string][]Value
    sz     int
}

其中Valuse是个map,它的key是series key + field, 它的value是具有相同的key的所有field value;其实就是把多个point按series key + field作了合并

func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
    // 计算总大小,欲分配内存
    encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key

    // allocate or re-slice to correct size
    if len(dst) < encLen {
        dst = make([]byte, encLen)
    } else {
        dst = dst[:encLen]
    }

    // Finally, encode the entry
    var n int
    var curType byte

    // 遍历Values,逐一编码
    for k, v := range w.Values {
        // 确定field的类型
        switch v[0].(type) {
        case FloatValue:
            curType = float64EntryType
        case IntegerValue:
            curType = integerEntryType
        case UnsignedValue:
            curType = unsignedEntryType
        case BooleanValue:
            curType = booleanEntryType
        case StringValue:
            curType = stringEntryType
        default:
            return nil, fmt.Errorf("unsupported value type: %T", v[0])
        }
        
        // 写入类型
        dst[n] = curType
        n++
 
        // 写入key长度,key = series key + field
        binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
        n += 2
        // 写入 key
        n += copy(dst[n:], k)

        // 写入 value个数
        binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
        n += 4

        // 逐一写入合部的value
        for _, vv := range v {
            binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
            n += 8

            switch vv := vv.(type) {
            case FloatValue:
                if curType != float64EntryType {
                    return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                }
                binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))
                n += 8
            case IntegerValue:
                if curType != integerEntryType {
                    return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                }
                binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
                n += 8
            case UnsignedValue:
                if curType != unsignedEntryType {
                    return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                }
                binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
                n += 8
            case BooleanValue:
                if curType != booleanEntryType {
                    return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                }
                if vv.value {
                    dst[n] = 1
                } else {
                    dst[n] = 0
                }
                n++
            case StringValue:
                if curType != stringEntryType {
                    return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                }
                binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))
                n += 4
                n += copy(dst[n:], vv.value)
            default:
                return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)
            }
        }
    }

    return dst[:n], nil
}
DeleteWALEntry
type DeleteWALEntry struct {
    Keys [][]byte
    sz   int
}
func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
    sz := w.MarshalSize()

    if len(dst) < sz {
        dst = make([]byte, sz)
    }

    var n int
    for _, k := range w.Keys {
        n += copy(dst[n:], k)
        n += copy(dst[n:], "\n")
    }

    // We return n-1 to strip off the last newline so that unmarshalling the value
    // does not produce an empty string
    return []byte(dst[:n-1]), nil
}
DeleteRangeWALEntry
type DeleteRangeWALEntry struct {
    Keys     [][]byte
    Min, Max int64  // 开始时间戳和结束时间戳
    sz       int
}
func (w *DeleteRangeWALEntry) Encode(b []byte) ([]byte, error) {
    sz := w.MarshalSize()

    if len(b) < sz {
        b = make([]byte, sz)
    }

    // 写入开始和结束时间戳
    binary.BigEndian.PutUint64(b[:8], uint64(w.Min))
    binary.BigEndian.PutUint64(b[8:16], uint64(w.Max))

    i := 16
    // 逐一写入key
    for _, k := range w.Keys {
        binary.BigEndian.PutUint32(b[i:i+4], uint32(len(k)))
        i += 4
        i += copy(b[i:], k)
    }

    return b[:i], nil
}

WALEntry的写入

func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
    var buf [5]byte
    // 写入类型和具体内容的长度
    buf[0] = byte(entryType)
    binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))

    if _, err := w.bw.Write(buf[:]); err != nil {
        return err
    }

    // 写入具体内容
    if _, err := w.bw.Write(compressed); err != nil {
        return err
    }

    w.size += len(buf) + len(compressed)

    return nil
}

WAL

WAL封装了一个预写日志的所有操作,正如前面提到了,一个Shard对应一个WAL,一个WAL在写入时又会产生多个WALSegment。
我们来分析一下一些主要的方法:

Open操作

遍历一个Shard目录下的所有Segment文件,这些文件按id从小到大排序,作初始化操作

func (l *WAL) Open() error {
    l.mu.Lock()
    defer l.mu.Unlock()
..

    if err := os.MkdirAll(l.path, 0777); err != nil {
        return err
    }

    // 获取所有segment 文件列表,按id从小到大排序,最后一个就是当前正写入的文件 
    segments, err := segmentFileNames(l.path)
    if err != nil {
        return err
    }

    if len(segments) > 0 {
        // 最后一个就是当前正写入的文件
        lastSegment := segments[len(segments)-1]
        
        // 获取最新的segment id
        id, err := idFromFileName(lastSegment)
        if err != nil {
            return err
        }

        // 初始化当前的segment id
        l.currentSegmentID = id
        stat, err := os.Stat(lastSegment)
        if err != nil {
            return err
        }

        if stat.Size() == 0 {
            // 如果文件大小为0, 删除
            os.Remove(lastSegment)
            segments = segments[:len(segments)-1]
        } else {
            //为写入,打开该文件 
            fd, err := os.OpenFile(lastSegment, os.O_RDWR, 0666)
            if err != nil {
                return err
            }
            if _, err := fd.Seek(0, io.SeekEnd); err != nil {
                return err
            }
            
            // 初始化当前的SegmentWriter
            l.currentSegmentWriter = NewWALSegmentWriter(fd)

            // Reset the current segment size stat
            atomic.StoreInt64(&l.stats.CurrentBytes, stat.Size())
        }
    }

    ...
    
    l.closing = make(chan struct{})

    return nil
}
writeToLog写入操作
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
    // 从buytesPool获取byte slice, 避免反复重新分配内存
    bytes := bytesPool.Get(entry.MarshalSize())

    // 将entry作编码,前面已经介绍过
    b, err := entry.Encode(bytes)
    if err != nil {
        bytesPool.Put(bytes)
        return -1, err
    }

    // 使用snappy压缩强词编码后的entry内容
    encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))

    compressed := snappy.Encode(encBuf, b)
    bytesPool.Put(bytes)

    syncErr := make(chan error)

    segID, err := func() (int, error) {
        l.mu.Lock()
        defer l.mu.Unlock()

        // Make sure the log has not been closed
        select {
        case <-l.closing:
            return -1, ErrWALClosed
        default:
        }

        // roll the segment file if needed
        if err := l.rollSegment(); err != nil {
            return -1, fmt.Errorf("error rolling WAL segment: %v", err)
        }

        // write and sync
        // 使用SegmentWriter来写入entry内容
        if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
            return -1, fmt.Errorf("error writing WAL entry: %v", err)
        }

        select {
        case l.syncWaiters <- syncErr:
        default:
            return -1, fmt.Errorf("error syncing wal")
        }
        
        // 将执行file sync操作,刷到磁盘文件 
        l.scheduleSync()

        // Update stats for current segment size
        atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))

        l.lastWriteTime = time.Now().UTC()

        return l.currentSegmentID, nil
    }()

    bytesPool.Put(encBuf)

    if err != nil {
        return segID, err
    }

    // schedule an fsync and wait for it to complete
    return segID, <-syncErr
}

Cache

  1. 时序数据在写入时,会先写入到上面介绍的WAL,然后写入到Cache,最后按照一定的策略Flush到磁盘文件。现在我们来介绍这个Cache。
  2. 这个Cache里缓存的是什么呢?
  3. 这个Cache用什么结构来作内存存储?
    我们下面来一一解答这些问题:
Entry
type entry struct {
    mu     sync.RWMutex
    values Values // All stored values.

    // The type of values stored. Read only so doesn't need to be protected by
    // mu.
    vtype byte
}

由这个定义我们可知,同一个entry里面的所有value的类型都是相同的,都是这个 vtype里所保存的类型。

func newEntryValues(values []Value) (*entry, error) {
    e := &entry{}
    e.values = make(Values, 0, len(values))
    e.values = append(e.values, values...)

    // No values, don't check types and ordering
    if len(values) == 0 {
        return e, nil
    }

    // 个人感觉应该先校验这组value的类型是否一致,不一致就不要作上面的make, append了。
    et := valueType(values[0])
    for _, v := range values {
        // Make sure all the values are the same type
        if et != valueType(v) {
            return nil, tsdb.ErrFieldTypeConflict
        }
    }

    // Set the type of values stored.
    e.vtype = et

    return e, nil
}
func (e *entry) deduplicate() {
    e.mu.Lock()
    defer e.mu.Unlock()

    if len(e.values) <= 1 {
        return
    }
    e.values = e.values.Deduplicate()
}

实际上是调用了Values.Deduplicate,这个Values提供了若干实用的方法,比如去掉,过滤等。

func (e *entry) filter(min, max int64) {
    e.mu.Lock()
    if len(e.values) > 1 {
        e.values = e.values.Deduplicate()
    }
    e.values = e.values.Exclude(min, max)
    e.mu.Unlock()
}

实际上是调用了Values.Exclude

storer
type storer interface {
    entry(key []byte) *entry                        // Get an entry by its key.
    write(key []byte, values Values) (bool, error)  // Write an entry to the store.
    add(key []byte, entry *entry)                   // Add a new entry to the store.
    remove(key []byte)                              // Remove an entry from the store.
    keys(sorted bool) [][]byte                      // Return an optionally sorted slice of entry keys.
    apply(f func([]byte, *entry) error) error       // Apply f to all entries in the store in parallel.
    applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
    reset()                                         // Reset the store to an initial unused state.
    split(n int) []storer                           // Split splits the store into n stores
    count() int                                     // Count returns the number of keys in the store
}

注释很清晰,我们这里不累述。

上一篇下一篇

猜你喜欢

热点阅读