Consul 的一致性读分析

2023-07-13  本文已影响0人  绝尘驹

背景

Consul 作为HashiCorp 出品的分布式注册中心和配置中心,是cp模型的,即强调一致性,通过raft协议实现

一致性

consul 一致性支持三种模式,即要强一致还是,最终一致, 可以交个用户选择,这才是一个优秀的分布式系统应该具备的,要了解一致性读,需要先了解consul的三种一致性模式,如下:

我们了解了consul支持三种一致性模式,你是不是很好奇,consul是怎么实现的呢,我们平时部署一个consul集群也没有让我指定是那一种啥,consul既然是交给用户来选择,所以consul通过api的参数来确定,需要用那种读一致性。

在哪里指定一致性级别

有聪明的同学就会问,说了这么多,我到底在哪里指定这个一致性级别,别急,下面就开始说

consul 通过http 接口提供服务,就在http的api里可以指定,客户端sdk就不说了,有很多版本,这里只说consul agent端,因为线上一般都是直接请求localhost:8500 访问本地的consul agent的。下面是所有consul agent http接口都要执行的一个逻辑parseConsistency,就是解析一致性

func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b structs.QueryOptionsCompat) bool {
    query := req.URL.Query()
    //这里默认就认为是default模式。
    defaults := true
    //解析http请求如果带了stale参数,则是允许读过期的数据,那就server不用转发给leader
    if _, ok := query["stale"]; ok {
        b.SetAllowStale(true)
        defaults = false
    }
    //解析http请求如果带了consistent参数,代表要读最新的数据。
    if _, ok := query["consistent"]; ok {
        b.SetRequireConsistent(true)
        defaults = false
    }
    //解析http请求如果带了consistent参数,代表要从leader读。
    if _, ok := query["leader"]; ok {
        defaults = false
    }
    //解析http请求如果带了cached参数,代表可以从agent读,不需要请求server
    if _, ok := query["cached"]; ok {
        b.SetUseCache(true)
        defaults = false
    }
    if maxStale := query.Get("max_stale"); maxStale != "" {
        dur, err := time.ParseDuration(maxStale)
        if err != nil {
            resp.WriteHeader(http.StatusBadRequest)
            fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
            return true
        }
        b.SetMaxStaleDuration(dur)
        if dur.Nanoseconds() > 0 {
            b.SetAllowStale(true)
            defaults = false
        }
    }
...

上面解析了客户端的读模式,下面看怎么用的,随便看一个consul读的代码,比如查看健康的service node 的一段代码:

//如果可以用cache的数据,则直接从当前agent响应。
    if args.QueryOptions.UseCache {
        raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
        if err != nil {
            return nil, err
        }
        defer setCacheMeta(resp, &m)
        reply, ok := raw.(*structs.IndexedCheckServiceNodes)
        if !ok {
            // This should never happen, but we want to protect against panics
            return nil, fmt.Errorf("internal error: response type not correct")
        }
        out = *reply
    } else {
    //否则需要通过rpc请求server节点。  
    RETRY_ONCE:
        if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
            return nil, err
        }
        if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
            args.AllowStale = false
            args.MaxStaleDuration = 0
            goto RETRY_ONCE
        }
    }

我们只有指定了cache参数,consul 才会从agent 本地直接响应数据,这里也可以看出,agent 是会缓存数据的,否则就需要请求server节点,这个时候问题又来了,server节点一般我们是一个集群,最少3个节点,那请求那一个呢,有负载均衡吗,带着这个问题,我们看下代码,怎么选server的, 代码如下:

// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server.  If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list.  If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list.  If there are no
// servers available, return nil.
func (m *Manager) FindServer() *metadata.Server {
    l := m.getServerList()
    numServers := len(l.servers)
    if numServers == 0 {
        m.logger.Warn("No servers available")
        return nil
    }

    // Return whatever is at the front of the list because it is
    // assumed to be the oldest in the server list (unless -
    // hypothetically - the server list was rotated right after a
    // server was added).
    return l.servers[0]
}

consul 这里是不是处理的很简单,每次都是取第一个,人家注释也说了,如果这个出现失败了,会移到最后。

Consul Server的逻辑

consul agent 发现不用本地cache的数据,那就要rpc请求server节点,server节点接受到任何请求,都会执行forward方法,来检查是否要转发请求还是就自己响应数据。

func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
    var firstCheck time.Time

    // Handle DC forwarding
    // 检查dc是否一致,不一致就要转发到正确的dc
    dc := info.RequestDatacenter()
    if dc != s.config.Datacenter {
        // Local tokens only work within the current datacenter. Check to see
        // if we are attempting to forward one to a remote datacenter and strip
        // it, falling back on the anonymous token on the other end.
        if token := info.TokenSecret(); token != "" {
            done, ident, err := s.ResolveIdentityFromToken(token)
            if done {
                if err != nil && !acl.IsErrNotFound(err) {
                    return false, err
                }
                if ident != nil && ident.IsLocal() {
                    // Strip it from the request.
                    info.SetTokenSecret("")
                    defer info.SetTokenSecret(token)
                }
            }
        }

        err := s.forwardDC(method, dc, args, reply)
        return true, err
    }

    // Check if we can allow a stale read, ensure our local DB is initialized
    // 这里server开始检查读一致性,如果允许读过期的数据,则直接用当前server的数据。
    // 不需要后面的检查是否为leader了。
    if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
        return false, nil
    }

CHECK_LEADER:
    // Fail fast if we are in the process of leaving
    select {
    case <-s.leaveCh:
        return true, structs.ErrNoLeader
    default:
    }

    // Find the leader
    // 到这里就是要default读或者consistent读,都需要从leader读数据。
    isLeader, leader := s.getLeader()

    // Handle the case we are the leader
    // 如果当前是leader,则不需要再转发到leader了。
    if isLeader {
        return false, nil
    }

    // Handle the case of a known leader
    // 不是leader,则需要再转发到leader节点,多一次网络请求。
    rpcErr := structs.ErrNoLeader
    if leader != nil {
        rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
            leader.Version, method, leader.UseTLS, args, reply)
        if rpcErr != nil && canRetry(info, rpcErr) {
            goto RETRY
        }
        return true, rpcErr
    }

有同学看到这里,不是还有consistent 模式没有讲吗,这个就不在分析了, 不然文章太长了,没有人看

总结

一写就这么多,总算把consul的一致性读的特性,怎么用的,和背后的原理给说明了,我们默认情况都是default模式,即请求都是需要通过访问agent,agent再请求server,如果server不是leader,还要转发到leader节点。要1次http,2次rpc才能获取到数据,所以如果有consul server压力大的,可以通过cache来缓解server特别是leader的压力。

上一篇 下一篇

猜你喜欢

热点阅读