groupcache 源码系列二 sigleflight

2019-03-14  本文已影响0人  合肥黑

参考
golang防缓存击穿利器--singleflight
protobuf、LRU、sigleflight
groupcache源码分析(四)-- singleflight

一、缓存击穿

给缓存加一个过期时间,下次未命中缓存时再去从数据源获取结果写入新的缓存,这个是后端开发人员再熟悉不过的基操。本人之前在做直播平台活动业务的时候,当时带着这份再熟练不过的自信,把复杂的数据库链表语句写好,各种微服务之间调用捞数据最后算好的结果,丢进了缓存然后设了一个过期时间,当时噼里啪啦两下写完代码觉得稳如铁蛋,结果在活动快结束之前,数据库很友好的挂掉了。当时回去查看监控后发现,是在活动快结束前,大量用户都在疯狂的刷活动页,导致缓存过期的瞬间有大量未命中缓存的请求直接打到数据库上所导致的,所以这个经典的问题稍不注意还是害死人

防缓存击穿的方式有很多种,比如通过计划任务来跟新缓存使得从前端过来的所有请求都是从缓存读取等等。之前读过 groupCache的源码,发现里面有一个很有意思的库,叫singleFlight, 因为groupCache从节点上获取缓存如果未命中,则会去其他节点寻找,其他节点还没有的话再从数据源获取,所以这个步骤对于防击穿非常有必要。singleFlight使得groupCache在多个并发请求对一个失效的key进行源数据获取时,只让其中一个得到执行,其余阻塞等待到执行的那个请求完成后,将结果传递给阻塞的其他请求达到防止击穿的效果。

二、源码分析

singleFlight这个package主要实现了这样一个功能:抑制同一个函数调用重复执行。举个例子:给一个常规程序输入一个函数调用A()需要10s返回结果,这时候有10个客户端都调用了这个A(),可能就需要100s才能完成所有的计算结果,但是这个计算是重复的,结果也是一样的。所以可以想个办法,判断是同一个计算过程的情况,不需要重复执行,直接等待上一次计算完成,然后一下子返回结果就行了。

该部分就封装了一个接口:func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error)

首先,先定义了下面两个结构体:

//实际请求函数的封装结构体
// call is an in-flight or completed Do call
type call struct {
    wg  sync.WaitGroup
    //实际的请求函数
    val interface{}
    err error
}

//主要是用来组织已经存在的对某key的请求和对应的实际请求函数映射
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
    //用于对m上锁,保护m
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

该函数入参是一个key和一个实际请求函数,出参是一个接口类型和一个错误类型。

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    //有可能要修改m,所以先上锁进行保护
    g.mu.Lock()
    //如果m为nil,则初始化一个
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    //如果m中存在对该key的请求,则该线程不会直接再次访问key,所以释放锁
    //然后堵塞等待已经存在的请求得到的结果
    if c, ok := g.m[key]; ok {
        //解锁
        g.mu.Unlock()
        //堵塞
        c.wg.Wait()
        //如果已经存在的请求完成,则堵塞状态会解除,继续向下执行,得到正确结果
        return c.val, c.err
    }
    //如果不存在对该key的请求,则本线程要进行实际的请求,保持m的锁定状态
    //创建一个实际请求结构体
    c := new(call)
    //为了保证其他的相同请求的堵塞
    c.wg.Add(1)
    //组织好映射关系
    g.m[key] = c
    //解锁m
    g.mu.Unlock()
    
    //执行真正的请求函数,得到对该key请求的结果
    c.val, c.err = fn()
    //得到结果后取消其他请求的堵塞
    c.wg.Done()

    //该次请求完成后,要从已存在请求map中删掉
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()
    
    //返回请求结果
    return c.val, c.err
}

使用实例

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/golang/groupcache/singleflight"
)

func NewDelayReturn(dur time.Duration, n int) func() (interface{}, error) {
    return func() (interface{}, error) {
        time.Sleep(dur)
        return n, nil
    }
}

func main() {
    g := singleflight.Group{}
    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
        ret, err := g.Do("key", NewDelayReturn(time.Second*1, 1))
        if err != nil {
            panic(err)
        }
        fmt.Printf("key-1 get %v\n", ret)
        wg.Done()
    }()
    go func() {
        time.Sleep(100 * time.Millisecond) // make sure this is call is later
        ret, err := g.Do("key", NewDelayReturn(time.Second*2, 2))
        if err != nil {
            panic(err)
        }
        fmt.Printf("key-2 get %v\n", ret)
        wg.Done()
    }()
    wg.Wait()
}

上一篇下一篇

猜你喜欢

热点阅读