groupcache源码中几个有趣的点

2019-01-20  本文已影响0人  辰辰沉沉沉

简介

groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

groupcache是一个可分布式缓存组件,用于在某些方面替代memcache,不过和一般的缓存有些区别,它只能做get操作(没错,只能get),但是不能做更新和删除操作。另外,groupcache可以很方便的集成到应用程序中,用http接口的形式与其他程序交互。

几个概念

几个文件

基本使用方式

创建group

// 需要提供一个获取的getter方法
var Group = groupcache.NewGroup("colorCache", 64<<20, groupcache.GetterFunc(
    func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
        log.Println("looking up", key)
        // 这里从数据库中获取一个值 
        v, ok := Store[key]
        if !ok {
            return errors.New("color not found")
        }
        dest.SetBytes(v)
        return nil
    },
))

设置peers并启动服务

// peer是本缓存服务服务的地址 如http://localhost:8080
pool := groupcache.NewHTTPPool(peer)
// ps是一组缓存服务器的地址
pool.Set(ps...)
http.ListenAndServe(*addr, nil)

获取缓存

var b []byte
// 获取缓存结果,存在b中。AllocatingByteSliceSink是一个对获取数据的包装
err := Group.Get(nil, color, groupcache.AllocatingByteSliceSink(&b))

几个有趣的点

peer的查询

给定一个key,groupcache会在本地找不到缓存的情况下,查询该key应该存在的peer。为了在新增或删除peer的时候尽量少的缓存失效,groupcache使用一致性hash的方案,并提供了一个consistenthash的实现,就在consistenthash/consistenthash.go中。

不赘述什么是一致性hash了,直接以注释分析的方式给出代码

type Hash func(data []byte) uint32

// hash 一个hash算法,默认为crc32.ChecksumIEEE
// replicas 每个peer节点会产生几个虚拟节点
// keys 储存所有虚拟节点的hash值
// hashMap 根据hash值找到相应的peer
type Map struct {
    hash     Hash
    replicas int
    keys     []int // Sorted
    hashMap  map[int]string
}

func New(replicas int, fn Hash) *Map {
    m := &Map{
        replicas: replicas,
        hash:     fn,
        hashMap:  make(map[int]string),
    }
    if m.hash == nil {
        m.hash = crc32.ChecksumIEEE
    }
    return m
}

// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
    return len(m.keys) == 0
}

// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
    for _, key := range keys {
      // 一个key就是一个peer节点
      // 对于每个key,都会生成replicas个虚拟节点,并把虚拟节点和key的对应关系存起来
        for i := 0; i < m.replicas; i++ {
            hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
            m.keys = append(m.keys, hash)
            m.hashMap[hash] = key
        }
    }
  // 排序 方便后边查找
    sort.Ints(m.keys)
}

// 这边的key为一个cache中的key,即要查询的key
// 计算这个key的hash值,找到与之最为接近的虚拟节点,再通过虚拟节点找到具体的peer
func (m *Map) Get(key string) string {
    if m.IsEmpty() {
        return ""
    }

  // 计算key对应的hash值
    hash := int(m.hash([]byte(key)))

    // 使用二分法查找key对应的hash值最近的虚拟节点
    idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

    // 一致性hash的虚拟节点应该构成一个环
    if idx == len(m.keys) {
        idx = 0
    }

    return m.hashMap[m.keys[idx]]
}

很经典的实现。

cache的查找过程

简单的说,就是Group.Get(nil, color, groupcache.AllocatingByteSliceSink(&b))发生了什么。

func (g *Group) Get(ctx Context, key string, dest Sink) error {
  // ... (省略一些不重要的代码)
  // 查询本地的缓存,很简单的方法
    value, cacheHit := g.lookupCache(key)

  // 如果命中,统计完直接返回
    if cacheHit {
        g.Stats.CacheHits.Add(1)
        return setSinkView(dest, value)
    }

    // 这边是关键 
    destPopulated := false
    value, destPopulated, err := g.load(ctx, key, dest)
    if err != nil {
        return err
    }
    if destPopulated {
        return nil
    }
    return setSinkView(dest, value)
}

如代码的注释,关键在于在本地没找到缓存的情况下,需要找到合适的peer(可能是自己)去计算出值。这里需要考虑一个问题,就是如果本地有两个协程同时查询一个key怎么办,groupcache会阻塞其中一个,让另一个先跑出结果,避免调用两次获取方法。destPopulated标记该值是不是由该peer新计算出来的(本地方法获取后会直接赋值dest)。

让我们看一下这边的load方法

func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
    g.Stats.Loads.Add(1)
  // loadGroup.Do是个竞争的方法,相同的key同时只会有一个访问
    viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
      // 可能存在两个并发访问了这个方法,后执行的协程可能可以直接从缓存中读入
        if value, cacheHit := g.lookupCache(key); cacheHit {
            g.Stats.CacheHits.Add(1)
            return value, nil
        }
        g.Stats.LoadsDeduped.Add(1)
        var value ByteView
        var err error
      // 根据上边的一致性hash,找到peer
        if peer, ok := g.peers.PickPeer(key); ok {
          // 从peer中获取值
            value, err = g.getFromPeer(ctx, peer, key)
            if err == nil {
                g.Stats.PeerLoads.Add(1)
                return value, nil
            }
            g.Stats.PeerErrors.Add(1)
        }
      // 不需要访问远端peer,可以直接自己获取
        value, err = g.getLocally(ctx, key, dest)
        if err != nil {
            g.Stats.LocalLoadErrs.Add(1)
            return nil, err
        }
        g.Stats.LocalLoads.Add(1)
        destPopulated = true // only one caller of load gets this return value
        // 缓存起来
         g.populateCache(key, value, &g.mainCache)
        return value, nil
    })
    if err == nil {
        value = viewi.(ByteView)
    }
    return
}

还需要解释最后一个问题,如何控制并发的访问。这边使用的是g.loadGroup.Do方法,loadGroup是一个singleflight.Group。让我们看一下这个Do方法做了啥。

package singleflight

import "sync"

// 包装一个key获取值锁需要的一些参数
type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}

type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    g.mu.Lock()
  // 懒加载
    if g.m == nil {
        g.m = make(map[string]*call)
    }
  // 已经有协程在处理了,阻塞(c.wg.Wait())等待完成
    if c, ok := g.m[key]; ok {
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err
    }
  // 目前没有协程在处理,新建一个处理的任务
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return c.val, c.err
}

这个方法还是比较简单的,主要就是使用sync.WaitGroup来保证只有一个协程在真的调用fn方法获取值,剩下的都在等待获取完成。

小结

看groupcache源码纯粹只是为了学习优秀的go的代码,并没有在实际中使用过… 不过这代码还是很有意思的。

上一篇 下一篇

猜你喜欢

热点阅读