juiceFS 代码小工具学习

2023-11-03  本文已影响0人  wayyyy

以下代码片段取自:https://github.com/juicedata/juicefs

SleepWithJitter

Jitter抖动意思

func SleepWithJitter(d time.Duration) {
    j := int64(d / 20) // +- 5%
    time.Sleep(d + time.Duration(rand.Int63n(2*j+1)-j))
}
内存大小格式化(可读性强)
func FormatBytes(n uint64) string {
    if n < 1024 {
        return fmt.Sprintf("%d Bytes", n)
    }
    units := []string{"K", "M", "G", "T", "P", "E"}
    m := n
    i := 0
    for ; i < len(units)-1 && m >= 1<<20; i++ {
        m = m >> 10
    }
    return fmt.Sprintf("%.2f %siB (%d Bytes)", float64(m)/1024.0, units[i], n)
}

func main() {
    fmt.Println(tool.FormatBytes(88))
    fmt.Println(tool.FormatBytes(8888))
    fmt.Println(tool.FormatBytes(8888888))
    fmt.Println(tool.FormatBytes(88888888888))
    fmt.Println(tool.FormatBytes(8888888888888))
}

输出:


image.png
memUseage
func MemoryUsage() (virt, rss uint64) {
    stat, err := ioutil.ReadFile("/proc/self/stat")
    if err == nil {
        stats := bytes.Split(stat, []byte(" "))
        if len(stats) >= 24 {
            v, _ := strconv.ParseUint(string(stats[22]), 10, 64)
            r, _ := strconv.ParseUint(string(stats[23]), 10, 64)
            return v, r * 4096
        }
    }

    var ru syscall.Rusage
    err = syscall.Getrusage(syscall.RUSAGE_SELF, &ru)
    if err == nil {
        return uint64(ru.Maxrss), uint64(ru.Maxrss)
    }
    return
}
代码超时封装
func WithTimeout(f func() error, timeout time.Duration) error {
    var t = time.NewTimer(timeout)
    var err error

    var done = make(chan struct{}, 1)
    go func() {
        err = f()
        done <- struct{}{}
    }()

    select {
    case <-done:
        t.Stop()
    case <-t.C:
        err = fmt.Errorf("timeout after %s", timeout)
    }

    return err
}
cond

相比较于 sync.cond 提供了可以超时的wait

// Cond is similar to sync.Cond, but you can wait with a timeout.
type Cond struct {
    L      sync.Locker
    signal chan struct{}
}

// Signal wakes up a waiter.
// It's required for the caller to hold L.
func (c *Cond) Signal() {
    select {
    case c.signal <- struct{}{}:
    default:
    }
}

// Broadcast wake up all the waiters.
// It's required for the caller to hold L.
func (c *Cond) Broadcast() {
    close(c.signal)
    c.signal = make(chan struct{})
}

// Wait until Signal() or Broadcast() is called.
func (c *Cond) Wait() {
    ch := c.signal
    c.L.Unlock()
    <-ch
    c.L.Lock()
}

var timerPool = sync.Pool{
    New: func() interface{} {
        return time.NewTimer(time.Second)
    },
}

// WaitWithTimeout wait for a signal or a period of timeout eclipsed.
// returns true in case of timeout else false
func (c *Cond) WaitWithTimeout(d time.Duration) bool {
    ch := c.signal
    c.L.Unlock()
    t := timerPool.Get().(*time.Timer)
    t.Reset(d)
    defer func() {
        t.Stop()
        timerPool.Put(t)
        c.L.Lock()
    }()
    select {
    case <-ch:
        return false
    case <-t.C:
        return true
    }
}

// NewCond creates a Cond.
func NewCond(lock sync.Locker) *Cond {
    return &Cond{lock, make(chan struct{})}
}
内存池
var used int64

func AllocMemory() int64 {
    return atomic.LoadInt64(&used)
}

// Alloc returns size bytes memory from Go heap.
func Alloc(size int) []byte {
    zeros := powerOf2(size)
    b := *pools[zeros].Get().(*[]byte)
    if cap(b) < size {
        panic(fmt.Sprintf("%d < %d", cap(b), size))
    }
    atomic.AddInt64(&used, int64(cap(b)))
    return b[:size]
}

// Free returns memory to Go heap.
func Free(b []byte) {
    atomic.AddInt64(&used, -int64(cap(b)))
    pools[powerOf2(cap(b))].Put(&b)
}

var pools []*sync.Pool

// powerOf2 输入2, 返回1, 输入5, 返回3
func powerOf2(s int) int {
    var bits int
    var p = 1
    for p < s {
        bits++
        p *= 2
    }
    return bits
}

func init() {
    pools = make([]*sync.Pool, 30) // 1 - 1G (1, 2, 4, 8, 16, ... 2^30 = 1G)
    for i := 0; i < 30; i++ {
        // 下面这里要用闭包, 如果省略掉闭包的写法, 那么每个长度都是2^30
        //pools[i] = &sync.Pool{
        //  New: func() interface{} {
        //      b := make([]byte, 1<<i)
        //      return &b
        //  },
        //}
        func(bits int) {
            pools[i] = &sync.Pool{
                New: func() interface{} {
                    b := make([]byte, 1<<bits)
                    return &b
                },
            }
        }(i)
    }

    go func() {
        for {
            time.Sleep(time.Minute * 10)
            runtime.GC()
        }
    }()
}
image.png
page

page 对上面内存池中申请的内存进行引用计数的管理

import (
    "errors"
    "io"
    "os"
    "runtime"
    "runtime/debug"
    "sync/atomic"
)

var pageStack = os.Getenv("JFS_PAGE_STACK") != ""

// Page is a page with refcount
type Page struct {
    refs    int32
    offHeap bool
    dep     *Page
    Data    []byte
    stack   []byte
}

func NewOffPage(size int) *Page {
    if size <= 0 {
        panic("size of page should > 0")
    }
    p := Alloc(size)
    page := &Page{
        refs:    1,
        offHeap: true,
        Data:    p,
    }

    if pageStack {
        page.stack = debug.Stack()
    }

    runtime.SetFinalizer(page, func(p *Page) {
        refCnt := atomic.LoadInt32(&p.refs)
        if refCnt != 0 {
            if refCnt > 0 {
                p.release()
            }
        }
    })

    return page
}

func (p *Page) Slice(off, len int) *Page {
    p.acquire()
    np := &Page{
        refs: 1,
        Data: p.Data[off : off+len],
        dep:  p,
    }
    return np
}

// acquire increase the refcount
func (p *Page) acquire() {
    if pageStack {
        p.stack = append(p.stack, debug.Stack()...)
    }

    atomic.AddInt32(&p.refs, 1)
}

// release decrease the refCount
func (p *Page) release() {
    if pageStack {
        p.stack = append(p.stack, debug.Stack()...)
    }

    if atomic.AddInt32(&p.refs, -1) == 0 {
        if p.offHeap {
            Free(p.Data)
        }
        if p.dep != nil {
            p.dep.release()
            p.dep = nil
        }
        p.Data = nil
    }
}

type PageReader struct {
    p   *Page
    off int
}

func NewPageReader(p *Page) *PageReader {
    p.acquire()
    return &PageReader{p, 0}
}

func (r *PageReader) Read(buf []byte) (int, error) {
    n, err := r.ReadAt(buf, int64(r.off))
    r.off += n
    return n, err
}

func (r *PageReader) ReadAt(buf []byte, off int64) (int, error) {
    if len(buf) == 0 {
        return 0, nil
    }
    if r.p == nil {
        return 0, errors.New("page is already released")
    }
    if int(off) == len(r.p.Data) {
        return 0, io.EOF
    }
    n := copy(buf, r.p.Data[off:])
    if n < len(buf) {
        return n, io.EOF
    }
    return n, nil
}

func (r *PageReader) Close() error {
    if r.p != nil {
        r.p.release()
        r.p = nil
    }

    return nil
}
singleflight
prefetch
上一篇下一篇

猜你喜欢

热点阅读