Go Cache

2021-03-19  本文已影响0人  JunChow520

缓存

缓存最简单的莫过于存储在内存中的键值对,键值对在Golang中称之为map。使用map做内存缓存时,每次有新数据就向map中插入数据就可以了吗?这样做存在的问题是什么呢?

如何设计分布式缓存系统,需要考虑资源控制、淘汰策略、并发、分布式节点通信等各个方面的问题。针对不同的应用场景,需要在不同的特性之间权衡。例如:是否需要支持缓存更新?还是假定缓存在淘汰之前是不允许改变的。不同的权衡对应着不同的实现。

参考应用 groupcache

缓存特性

淘汰策略

由于缓存全部存储在内存中,内存本身是有限的,因此不可能无限制地添加数据。

假如设置缓存能够使用内存大小为N,在某个时间点添加某一条缓存记录后,占用内存超过了N,此时就需要从缓存中移除一条或多条数据。那移除谁呢?肯定希望尽可能移除“没用”的数据,那如何判断数据“有用”还是“没用”呢?

常见的缓存淘汰策略分为三种:FIFO/LFU/LRU

FIFO: First In First Out 先进先出

FIFO先进先出即淘汰缓存中最早添加的也就是最老的记录

FIFO认为最早添加的记录其不再被使用的可能性会被刚添加的可能性大

FIFO算法实现,创建一个队列,新增记录添加到队尾,每次内存不够时淘汰队首。

FIFO缺陷在于很多场景下,部分记录虽然是最早添加的但也最常被访问,而不得不因为呆的时间太长而被淘汰,此类数据会被频繁地添加缓存,又被淘汰出来,导致缓存命中率低。

LFU: Least Frequently Used 最少频繁使用

LFU是淘汰缓存中访问频率最低的记录

LFU认为若数据过去被访问多次,那么将来被访问的频率也更高。

LFU的实现需要维护一个按照访问次数排序的队列,每次访问时访问次数加1,队列重新排序,淘汰时选择访问次数最少的即可。

LFU算法的命中率比较高,缺点在于需要维护每个记录的访问次数,对内存的消耗是很高的。另外,如果数据的访问模式发生变化,LFU需要较长的时间去适应,也就是说LFU算法受历史数据的影响比较大。

例如:某个数据历史上访问次数奇高,但在某个时间点之后几乎不再被访问,但因为历史访问次数过高,而迟迟不能被淘汰。

LRU: Least Recently Used 最近最少使用

LRU最近最少使用,相对于仅考虑时间因素的FIFO和仅考虑访问频率的LFU,LRU算法可认为是相对平衡的一种淘汰算法。

LRU核心思想是若数据最近访问过,那么将来被访问的概率也会更高。其实现方式是使用一个链表保存数据,当新数据插入到链表头部时,每当缓存命中(即缓存数据被访问)则将数据移动到链表头部。当链表满时将链表尾部数据丢弃。

LRU算法的实现需维护一个队列,如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。

LRU算法

LRU算法最核心的2个数据结构

LRU算法实现

package web

import "container/list"

//Value 接口
type Value interface {
    Len() int //值占用的内存大小
}

//LRU 缓存 Least Recently Used 最近最少使用
type LRU struct {
    dll       *list.List                    //双向链表 Double Linked List
    dict      map[string]*list.Element      //字典键值对
    maxBytes  int64                         //最大可用内存
    usedBytes int64                         //当前已用内存
    onEvicted func(key string, value Value) //记录删除时触发的回调函数
}

实例化创建缓存

//NewLRU 创建缓存
func NewLRU(maxBytes int64, onEvicted func(string, Value)) *LRU {
    return &LRU{
        dll:       list.New(),
        dict:      make(map[string]*list.Element),
        maxBytes:  maxBytes,
        onEvicted: onEvicted,
    }
}

获取数据条数

//Len 实现接口获取数据条数
func (l *LRU) Len() int {
    return l.dll.Len()
}

缓存查询

//Entry 字典实体结构
type Entry struct {
    key   string
    value Value
}

//Get 根据键名查找键值
func (l *LRU) Get(key string) (value Value, ok bool) {
    //判断字典中是否存在键
    ele, ok := l.dict[key]
    if !ok {
        return nil, false
    }
    //将目标节点移动至队尾
    l.dll.MoveToFront(ele)
    //获取值并转换格式
    kv := ele.Value.(*Entry)
    //返回数据
    return kv.value, true
}

读取时从map中查询,若能查询到值则直接将List中该值移动到链表头部同时返回查询结果。

缓存删除

//Eliminate 淘汰策略 删除节点
func (l *LRU) Eliminate() {
    //获取队首元素
    ele := l.dll.Back()
    if ele == nil {
        return
    }
    //移除最近最少访问的节点
    l.dll.Remove(ele)
    //获取字典并删除键值对
    kv := ele.Value.(*Entry)
    delete(l.dict, kv.key)
    //重置可用空间
    l.usedBytes -= int64(len(kv.key)) + int64(kv.value.Len())
    //触发删除回调
    if l.onEvicted != nil {
        l.onEvicted(kv.key, kv.value)
    }
}

新建或修改

//Add 新增或更新键值对
func (l *LRU) Add(key string, value Value) {
    //判断键是否存在
    if ele, ok := l.dict[key]; ok {
        //更新 将节点移动至队尾
        l.dll.MoveToFront(ele)
        //获取字典键值对
        kv := ele.Value.(*Entry)
        //更新已使用大小
        l.usedBytes += int64(value.Len()) - int64(kv.value.Len())
        //更新字典
        kv.value = value
    } else {
        //添加
        entry := &Entry{key, value}
        ele := l.dll.PushFront(entry)
        l.dict[key] = ele
        l.usedBytes += int64(len(key)) + int64(value.Len())
    }
    //淘汰策略
    for l.maxBytes != 0 && l.maxBytes < l.usedBytes {
        l.Eliminate()
    }
}

测试

package main

import (
    "fmt"
    "gfw/web"
)

type String string

func (str String) Len() int {
    return len(str)
}

func main() {
    k1, k2, k3 := "id", "name", "pid"
    v1, v2, v3 := "1", "admin", "0"
    cap := len(k1 + k2 + v1 + v2)

    keys := make([]string, 0)
    lru := web.NewLRU(int64(cap), func(key string, val web.Value) {
        fmt.Printf("DEL:key = %v, val = %v\n", key, val)
        keys = append(keys, key)
    })
    lru.Add(k1, String(v1))
    fmt.Printf("ADD:%v\n", lru)
    lru.Add(k2, String(v2))
    fmt.Printf("ADD:%v\n", lru)
    lru.Add(k3, String(v3))
    fmt.Printf("ADD:%v\n", lru)

    val, ok := lru.Get(k3)
    if !ok {
        panic("cache get error")
    }
    fmt.Printf("GET:key = %v, val = %v, type = %T, v = %v\n", k3, val, val, string(val.(String)))

    if !reflect.DeepEqual(keys, []string{k1, k2}) {
        panic("call OnEvicated failed")
    }
}

单机并发

当多个goroutine同时读写同一个变量,在并发度较高的情况下会发生冲突。为确保每次只有一个goroutine可以访问变量以避免冲突,称之为互斥。

解决互斥问题可使用互斥锁sync.Mutexsync.Mutex是一个互斥锁,可由不同的goroutine加锁和解锁。

sync.Mutex是Golang提供的一个互斥锁,当一个goroutine获得互斥锁的拥有权后,其他请求锁的goroutine会阻塞在Lock()方法的调用上,直到调用Unlock()锁被释放。

缓存值

$ vim byte_view.go
package web

//ByteView 只读数据结构用于表示缓存值
type ByteView struct {
    data []byte //缓存值 只读属性 byte类型可支持任意数据类型
}

//Clone 设置data属性为只读
//返回拷贝以防止缓存值被外部程序修改
func (bv ByteView) Clone() []byte {
    bs := make([]byte, len(bv.data))
    copy(bs, bv.data)
    return bs
}

//Len 缓存对象必须事项Value接口的Len方法以获取占用内存大小
func (bv ByteView) Len() int {
    return len(bv.data)
}

//String 将缓存值转换为字符串
func (bv ByteView) String() string {
    return string(bv.data)
}

缓存

延迟初始化(Lazy Initialization),一个对象的延迟初始化意味着该对象的创建将会延迟至第一次使用该对象时,主要用于提高性能,并减少程序内存要求。

$ vim cache.go
package web

import "sync"

//cache 缓存
type cache struct {
    mutex sync.Mutex //互斥锁
    lru   *LRU       //LRU淘汰策略
    size  int64      //缓存最大尺寸
}

//add 添加缓存
func (c *cache) add(key string, value ByteView) {
    //添加锁
    c.mutex.Lock()
    defer c.mutex.Unlock()
    //延迟初始化
    if c.lru == nil {
        c.lru = NewLRU(c.size, nil)
    }
    //添加键值对
    c.lru.Add(key, value)
}

//get 获取缓存
func (c *cache) get(key string) (value ByteView, ok bool) {
    //添加锁
    c.mutex.Lock()
    defer c.mutex.Unlock()
    //判断LRU实例是否存在
    if c.lru == nil {
        return
    }
    //获取键值
    val, ok := c.lru.Get(key)
    if !ok {
        return
    }
    return val.(ByteView), ok
}

回调

回调实现

$ vim callback.go
package web

type Callback interface {
    Call(key string) ([]byte, error)
}

type CallbackFunc func(key string) ([]byte, error)

func (cf CallbackFunc) Call(key string) ([]byte, error) {
    return cf(key)
}

测试

package test

import (
    "gfw/web"
    "reflect"
    "testing"
)

func TestCallback(t *testing.T) {
    //类型转换 将匿名回调函数转换为接口
    var cb web.Callback = web.CallbackFunc(func(key string) ([]byte, error) {
        return []byte(key), nil
    })
    //调用接口方法,即调用匿名回调函数
    v, _ := cb.Call("key")
    //测试
    expect := []byte("key")
    if !reflect.DeepEqual(v, expect) {
        t.Errorf("callback failed")
    }
}
$ go test -v -run TestCallback fn_test.go

定义函数类型F同时实现接口A的方法,在A方法中调用自己。这是Golang中将其它函数(参数返回值定义与F一致)转换为接口A的常用技巧。

命名空间

$ vim ./web/cache_ns.go
package web

import (
    "fmt"
    "log"
    "sync"
)

//全局变量
var (
    rwmutex  sync.RWMutex
    cachenss = make(map[string]*CacheNS)
)

//CacheNS 缓存的命名空间
type CacheNS struct {
    name     string   //缓存命名空间名称
    callback Callback //缓存未命中时获取源数据的回调函数
    cache    Cache    //并发缓存
}

//NewCacheNS 创建缓存命名空间
func NewCacheNS(name string, size int64, cb Callback) *CacheNS {
    if cb == nil {
        panic("nil callback")
    }
    //读写锁
    rwmutex.Lock()
    defer rwmutex.Unlock()
    //实例化
    instance := &CacheNS{name: name, callback: cb, cache: Cache{size: size}}
    cachenss[name] = instance

    return instance
}

//GetCacheNS 创建缓存命名空间
func GetCacheNS(name string) *CacheNS {
    //只读锁
    rwmutex.RLock()
    defer rwmutex.RUnlock()
    //获取映射
    cachens, ok := cachenss[name]
    if !ok {
        return nil
    }

    return cachens
}

//Get 获取数据
func (c *CacheNS) Get(key string) (ByteView, error) {
    //键名存在
    if key == "" {
        return ByteView{}, fmt.Errorf("key is required")
    }
    //缓存命中
    val, ok := c.cache.get(key)
    if ok {
        log.Println("cache hit")
        return val, nil
    }
    //加载源数据
    return c.load(key)
}

//load 加载源数据
func (c *CacheNS) load(key string) (ByteView, error) {
    //获取源数据
    bytes, err := c.callback.Call(key)
    if err != nil {
        return ByteView{}, err
    }
    //将源数据添加到缓存中
    data := CloneBytes(bytes)
    value := ByteView{data: data}
    c.cache.add(key, value)
    //返回数据
    return value, nil
}

测试

$ vim test/cache_test.go
package test

import (
    "fmt"
    "gfw/web"
    "testing"
)

func TestCache(t *testing.T) {
    //缓存键值对数据
    var db = map[string]string{"alice": "90", "bob": "86", "carl": "70"}

    //统计某个键回调次数,回调发生则说明没有缓存。
    loadCounts := make(map[string]int, len(db))

    //创建缓存
    cb := web.CallbackFunc(func(key string) ([]byte, error) {
        //判断键是否存在
        fmt.Printf("search key %v\n", key)
        val, ok := db[key]
        if !ok {
            return nil, fmt.Errorf("%s not exists", key)
        }
        //统计键的回调次数
        _, ok = loadCounts[key]
        if !ok {
            loadCounts[key] = 0
        }
        loadCounts[key] += 1

        return []byte(val), nil
    })
    ns := web.NewCacheNS("scores", 2<<10, cb)

    //遍历键值对
    for k, v := range db {
        //命中缓存
        view, err := ns.Get(k)
        if err != nil || view.String() != v {
            t.Fatal("failed to get value of db")
        }
        //是否缓存
        _, err = ns.Get(k)
        if err != nil || loadCounts[k] > 1 {
            t.Fatalf("cache %s miss", k)
        }
    }

    //非法缓存
    view, err := ns.Get("unknown")
    if err == nil {
        t.Fatalf("the value of unknow should be empty, but %s got", view)
    }
}
$ go test -v -run TestCache cache_test.go
=== RUN   TestCache
search key alice
2021/03/20 15:54:34 cache hit
search key bob
2021/03/20 15:54:34 cache hit
search key carl
2021/03/20 15:54:34 cache hit
search key unknown
--- PASS: TestCache (0.01s)
PASS
ok      command-line-arguments  0.304s

HTTP服务器

分布式缓存需要实现节点间通信,建立基于HTTP的通信机制是比较常见和简单的做法。如果一个节点启动了HTTP服务,那么这个节点就可以被其他节点访问。

缓存节点URL访问规则:主机/前缀/分组/键名

http://example.com/cache/users/id

由于主机可以承载不同服务,对于分布式缓存服务,可以使用/cache/作为默认前缀以示区别。

$ vim ./web/cache_http.go
package web

import (
    "fmt"
    "log"
    "net/http"
    "strings"
)

//HTTPPool 分布式缓存节点,承载缓存节点间HTTP通信的核心数据结构。
type HTTPPool struct {
    addr   string //地址,包括主机名/IP和端口
    prefix string //节点间通讯地址的前缀
}

//NewHTTPPool 创建分布式缓存节点
func NewHTTPPool(addr string, prefix string) *HTTPPool {
    return &HTTPPool{addr: addr, prefix: prefix}
}

//Log 日志记录
func (p *HTTPPool) Log(format string, params ...interface{}) {
    msg := fmt.Sprintf(format, params...)
    log.Printf("[Server] %s %s\n", p.addr, msg)
}

实现http.Handler接口的ServeHTTP方法

//ServeHTTP 实现Handler接口的ServeHTTP方法已转换为HTTP的Handler
func (p *HTTPPool) ServeHTTP(rw http.ResponseWriter, rq *http.Request) {
    //前缀判断
    urlPath := rq.URL.Path
    if !strings.HasPrefix(urlPath, p.prefix) {
        panic("HTTPPool serving unexpected path " + urlPath)
    }
    p.Log("%s %s", rq.Method, urlPath)
    //路径拆分 路径规则 /<prefix>/<group>/<key>
    parts := strings.SplitN(urlPath[len(p.prefix):], "/", 2)
    if len(parts) != 2 {
        http.Error(rw, "BAD REQUEST", http.StatusBadRequest)
        return
    }
    group := parts[0]
    key := parts[1]
    //获取缓存分组
    ns := GetCacheNS(group)
    if ns == nil {
        http.Error(rw, "no such group "+group, http.StatusNotFound)
        return
    }
    //获取键值
    view, err := ns.Get(key)
    if err != nil {
        http.Error(rw, err.Error(), http.StatusInternalServerError)
        return
    }
    bytes := view.Clone()
    //响应
    rw.Header().Set("Content-Type", "application/octet-stream")
    rw.Write(bytes)
}

服务端测试

$ vim ./test/fn_test.go
package test

import (
    "fmt"
    "gfw/web"
    "log"
    "net/http"
    "testing"
)

func TestCache(t *testing.T) {
    //使用map模拟数据源 缓存键值对数据
    var db = map[string]string{"alice": "90", "bob": "86", "carl": "70"}
    //创建名为scores的缓存,若缓存为空则回调函数从db中获取并返回
    cb := web.CallbackFunc(func(key string) ([]byte, error) {
        //判断键是否存在
        fmt.Printf("search key %v\n", key)
        val, ok := db[key]
        if !ok {
            return nil, fmt.Errorf("%s not exists", key)
        }
        return []byte(val), nil
    })
    web.NewCacheNS("scores", 2<<10, cb)

    //创建分布式缓存节点 启动HTTP服务
    addr := "127.0.0.1:9999"
    peers := web.NewHTTPPool(addr, "/cache/")
    log.Println("Cache Peer Server is running at ", addr)
    log.Fatal(http.ListenAndServe(addr, peers))
}
$ go test -v -run TestCache fn_test.go
=== RUN   TestCache
2021/03/20 17:07:39 Cache Peer Server is running at  127.0.0.1:9999
2021/03/20 17:08:26 [Server] 127.0.0.1:9999 GET /cache/scores/alice
search key alice

客户端测试

$ curl -i http://127.0.0.1:9999/cache/scores/alice
HTTP/1.1 200 OK
Content-Type: application/octet-stream
Date: Sat, 20 Mar 2021 09:08:26 GMT
Content-Length: 2

90
上一篇 下一篇

猜你喜欢

热点阅读