[istio源码分析][galley] galley之下游(mc

2020-01-27  本文已影响0人  nicktming

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)

在上文 [istio源码分析][galley] galley之runtime 中分析了galley整个机制中一个承上启下的组件, 本文将分析该组件的下游部分, 也就是mcp server端会承担此部分, 所有对接的mcp client(比如pilot)将会接收到此信息.

2. server

可以先看看server端是如何初始化的.

// galley/pkg/server/components/processing.go
func NewProcessing(a *settings.Args) *Processing {
    d := snapshot.New(groups.IndexFunction)
    return &Processing{
        args:         a,
        distributor:  d,
        configzTopic: configz.CreateTopic(d),
    }
}

p.distributor 就是snapshot的一个实例(后面会对snapshot分析), 接着看Start()方法.

func (p *Processing) Start() (err error) {
    // TODO: cleanup

    ...
    types := p.getMCPTypes()
    processorCfg := runtime.Config{
        DomainSuffix:             p.args.DomainSuffix,
        Mesh:                     mesh,
        Schema:                   types,
        SynthesizeServiceEntries: p.args.EnableServiceDiscovery,
    }
    p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)

    grpcOptions := p.getServerGrpcOptions()

    p.stopCh = make(chan struct{})
    var checker source.AuthChecker = server.NewAllowAllChecker()
    ...
    grpc.EnableTracing = p.args.EnableGRPCTracing
    p.grpcServer = grpc.NewServer(grpcOptions...)

    p.reporter = mcpMetricReporter("galley")

    options := &source.Options{
        Watcher:            p.distributor,
        Reporter:           p.reporter,
        CollectionsOptions: source.CollectionOptionsFromSlice(types.Collections()),
        ConnRateLimiter:    mcprate.NewRateLimiter(time.Second, 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074
    }

    md := grpcMetadata.MD{
        versionMetadataKey: []string{version.Info.Version},
    }
    if err := parseSinkMeta(p.args.SinkMeta, md); err != nil {
        return err
    }
    ...
    serverOptions := &source.ServerOptions{
        AuthChecker: checker,
        RateLimiter: rate.NewLimiter(rate.Every(time.Second), 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074
        Metadata:    md,
    }
    p.mcpSource = source.NewServer(options, serverOptions)
    ...
}

关注这几个地方:
1. options中的Watcher就是p.distributor.
2. p.mcpSource = source.NewServer(options, serverOptions) 创建一个mcp server端.

// pkg/mcp/source/server_source.go
func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server {
    s := &Server{
        src:         New(srcOptions),
        authCheck:   serverOptions.AuthChecker,
        rateLimiter: serverOptions.RateLimiter,
        metadata:    serverOptions.Metadata,
    }
    return s
}
// pkg/mcp/source/source.go
func New(options *Options) *Source {
    s := &Source{
        watcher:        options.Watcher,
        collections:    options.CollectionsOptions,
        reporter:       options.Reporter,
        requestLimiter: options.ConnRateLimiter,
    }
    return s
}

可以看到server端的src的对象都是从Processing.options里面来的.

2.1 EstablishResourceStream

// pkg/mcp/source/server_source.go
func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error {
    ...
    err := s.src.ProcessStream(stream)
    code := status.Code(err)
    if code == codes.OK || code == codes.Canceled || err == io.EOF {
        return nil
    }
    return err
}

主要关注ProcessStream方法

2.2 ProcessStream

1. 通过newConnection为该stream建立连接.
2. 异步接收request, 通过channel(con.requestC传递request进行处理)
3. 循环处理request, 从con.requestC中获得request, 通过processClientRequest方法处理request, 从con.queue读取需要返回给client端的response, 所以可想而知processClientRequest中会组装response放到con.queue中.

func (s *Source) ProcessStream(stream Stream) error {
    // 为该client建立连接
    con := s.newConnection(stream)

    defer s.closeConnection(con)
    // 接收request
    go con.receive()

    for {
        select {
        case <-con.queue.Ready():
            collection, item, ok := con.queue.Dequeue()
            if !ok {
                break
            }
            resp := item.(*WatchResponse)
            w, ok := con.watches[collection]
            if !ok {
                scope.Errorf("unknown collection in dequeued watch response: %v", collection)
                break // bug?
            }
            // the response may have been cleared before we got to it
            if resp != nil {
                if err := con.pushServerResponse(w, resp); err != nil {
                    return err
                }
            }
        case req, more := <-con.requestC:
            // 接收request 可想而知
            // receive方法主要是把request放到con.requestC中
            if !more {
                return con.reqError
            }
            if con.limiter != nil {
                if err := con.limiter.Wait(stream.Context()); err != nil {
                    return err
                }

            }
            // 处理request
            if err := con.processClientRequest(req); err != nil {
                return err
            }
        case <-con.queue.Done():
            // queue 关闭
            scope.Debugf("MCP: connection %v: stream done", con)
            return status.Error(codes.Unavailable, "server canceled watch")
        }
    }
}

先看一下如何建立连接的

// pkg/mcp/source/source.go
func (s *Source) newConnection(stream Stream) *connection {
    peerAddr := "0.0.0.0"

    peerInfo, ok := peer.FromContext(stream.Context())
    if ok {
        peerAddr = peerInfo.Addr.String()
    } else {
        scope.Warnf("No peer info found on the incoming stream.")
        peerInfo = nil
    }

    con := &connection{
        stream:   stream,
        peerAddr: peerAddr,
        requestC: make(chan *mcp.RequestResources),
        watches:  make(map[string]*watch),
        watcher:  s.watcher,
        id:       atomic.AddInt64(&s.nextStreamID, 1),
        reporter: s.reporter,
        limiter:  s.requestLimiter.Create(),
        queue:    internal.NewUniqueScheduledQueue(len(s.collections)),
    }

    // 为每个collection建立watch
    collections := make([]string, 0, len(s.collections))
    for i := range s.collections {
        collection := s.collections[i]
        w := &watch{
            ackedVersionMap: make(map[string]string),
            incremental:     collection.Incremental,
        }
        con.watches[collection.Name] = w
        collections = append(collections, collection.Name)
    }
    ...
    return con
}

可以看到主要是为了给每个collection建立一个watch对象. 看一下接收函数如何实现的.

func (con *connection) receive() {
    defer close(con.requestC)
    for {
        // 接收信息
        req, err := con.stream.Recv()
        if err != nil {
            if err == io.EOF {
                scope.Infof("MCP: connection %v: TERMINATED %q", con, err)
                return
            }
            con.reporter.RecordRecvError(err, status.Code(err))
            scope.Errorf("MCP: connection %v: TERMINATED with errors: %v", con, err)
            con.reqError = err
            return
        }
        select {
        // 写入到channel con.requestC
        case con.requestC <- req:
        case <-con.queue.Done():
            scope.Debugf("MCP: connection %v: stream done", con)
            return
        case <-con.stream.Context().Done():
            scope.Debugf("MCP: connection %v: stream done, err=%v", con, con.stream.Context().Err())
            return
        }
    }
}

可以看到从client端接到request后会放入con.requestC这个channel. 所以现在回到ProcessStream方法中看看从con.requestC中收到request会如何操作, 会调用processClientRequest处理request.

2.3 processClientRequest

func (con *connection) processClientRequest(req *mcp.RequestResources) error {
    if isTriggerResponse(req) {
        return nil
    }

    collection := req.Collection

    con.reporter.RecordRequestSize(collection, con.id, internal.ProtoSize(req))
    // 取出watch
    w, ok := con.watches[collection]
    if !ok {
        return status.Errorf(codes.InvalidArgument, "unsupported collection %q", collection)
    }
    if req.ResponseNonce == "" || w.pending.GetNonce() == req.ResponseNonce {
        versionInfo := ""

        if w.pending == nil {
            // 发送请求(第一次发送)
            scope.Infof("MCP: connection %v: inc=%v WATCH for %v", con, req.Incremental, collection)
        } else {
            // 发送ACK或者NACK (第二次发送) 
            versionInfo = w.pending.SystemVersionInfo
            if req.ErrorDetail != nil {
                scope.Warnf("MCP: connection %v: NACK collection=%v version=%q with nonce=%q error=%#v inc=%v", // nolint: lll
                    con, collection, req.ResponseNonce, versionInfo, req.ErrorDetail, req.Incremental)
                con.reporter.RecordRequestNack(collection, con.id, codes.Code(req.ErrorDetail.Code))
            } else {
                scope.Infof("MCP: connection %v ACK collection=%v with version=%q nonce=%q inc=%v",
                    con, collection, versionInfo, req.ResponseNonce, req.Incremental)
                con.reporter.RecordRequestAck(collection, con.id)

                internal.UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)
            }

            // clear the pending request after we finished processing the corresponding response.
            w.pending = nil
        }

        if w.cancel != nil {
            w.cancel()
        }

        sr := &Request{
            SinkNode:    req.SinkNode,
            Collection:  collection,
            VersionInfo: versionInfo,
            incremental: req.Incremental,
        }
        // con.watcher = snapshot
        // snapshot的Watcher方法中会组装response 并调用queueResponse方法将response入队列
        w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr)
    } else {
        ...
    }
    return nil
}

func (con *connection) queueResponse(resp *WatchResponse) {
    if resp == nil {
        con.queue.Close()
    } else {
        con.queue.Enqueue(resp.Collection, resp)
    }
}

关于mcp可以参考 https://github.com/istio/api/tree/master/mcp, 这里用此图可以增加理解

mcp.png

从以上图片和processClientRequest可以知道:
1. 第一次从client端发送request, 以后的内容都会是从serverpushclient端.

这里先分析从client发送request然后server返回response最后client发送ACK的过程. 然后再分析server是如何主动push信息到client端并且client端返回ACK.

2. 更新该collection对应的watch对象中的cancel方法.
3. 关注w.pending变量的作用.
4. response信息在snapshot中组装后放入到了con.queue中.

可以看一下snapshotwatch方法

func (c *Cache) Watch(request *source.Request, pushResponse source.PushResponseFunc, peerAddr string) source.CancelWatchFunc { // nolint: lll
    group := c.groupIndex(request.Collection, request.SinkNode)
    c.mu.Lock()
    defer c.mu.Unlock()
    // 更新status
    info := c.fillStatus(group, request, peerAddr)
    collection := request.Collection
    // return an immediate response if a snapshot is available and the
    // requested version doesn't match.
    // 这个snapshots会在setSnapshot方法中更新
    if snapshot, ok := c.snapshots[group]; ok {

        version := snapshot.Version(request.Collection)
        scope.Debugf("Found snapshot for group: %q for %v @ version: %q",
            group, request.Collection, version)

        if version != request.VersionInfo {
            scope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot)
            response := &source.WatchResponse{
                Collection: request.Collection,
                Version:    version,
                Resources:  snapshot.Resources(request.Collection),
                Request:    request,
            }
            // 放入到con.queue中
            pushResponse(response)
            return nil
        }
        info.synced[request.Collection][peerAddr] = true
    }
    c.watchCount++
    watchID := c.watchCount
    ...
    info.mu.Lock()
    // 更新watches
    info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse}
    info.mu.Unlock()
    ...
    return cancel
}

1. 如果version不同的时候会通过pushResponse方法放入到con.queue中将response发送给client端.
2. 如果snapshots中没有或者version没有更新, 则会更新info.watches, 在setSnapshot方法中server端会pushclient端.

2.4 pushServerResponse

func (con *connection) pushServerResponse(w *watch, resp *WatchResponse) error {
    ...
    if incremental {
        added, removed = calculateDelta(resp.Resources, w.ackedVersionMap)
    } else {
        // resp.Resources就是snapshot快照里面的内容
        for _, resource := range resp.Resources {
            added = append(added, *resource)
        }
    }
    msg := &mcp.Resources{
        SystemVersionInfo: resp.Version,
        Collection:        resp.Collection,
        Resources:         added,
        RemovedResources:  removed,
        Incremental:       incremental,
    }
    // increment nonce
    con.streamNonce++
    msg.Nonce = strconv.FormatInt(con.streamNonce, 10)
    if err := con.stream.Send(msg); err != nil {
        con.reporter.RecordSendError(err, status.Code(err))
        return err
    }
    scope.Debugf("MCP: connection %v: SEND collection=%v version=%v nonce=%v inc=%v",
        con, resp.Collection, resp.Version, msg.Nonce, msg.Incremental)
    // 在向client端发送成功后设置w.pending
    // 当client端发送ACK/NACK的时候用于验证
    w.pending = msg
    return nil
}

1.response组装成mcp.Resources发送给client端.
2. 在向client端发送成功后设置w.pending, 当client端发送ACK/NACK的时候server端会在processClientRequest方法用于判断.
如果是ACK,会调用UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)方法更新w.ackedVersionMap, w.ackedVersionMap 记录着client端目前保存的内容.

2.5 总结

现在来整体说一下整个流程.

mcp.png

1. 第一次由client端发送request.
2. 然后server端发送数据给client端.
3. 然后client端向server端发送ACK/NACK, server端根据反馈情况做处理. 比如反馈ACK时会更新w.ackedVersionMap.

接着server端会主动给client端发送数据, 那何时发数据呢?这个时候就与 [istio源码分析][galley] galley之runtime 中分析的有关, 从 [istio源码分析][galley] galley之runtime 中知道上游source把数据以事件形式交由runtime处理后交给p.distributor处理, 从Start()方法中知道p.distributor就是snapshot.

func NewProcessing(a *settings.Args) *Processing {
    d := snapshot.New(groups.IndexFunction)
    return &Processing{
        args:         a,
        distributor:  d,
        configzTopic: configz.CreateTopic(d),
    }
}
func (p *Processing) Start() (err error) {
    ...
    p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
    ...
    return nil
}

p.distributor会通过SetSnapshot

// galley/pkg/runtime/processor.go
func (p *Processor) Start() error {
          case <-p.stateStrategy.Publish:
                scope.Debug("Processor.process: publish")
                // 将当前state对象内存中保存的对象建立一个快照
                s := p.state.buildSnapshot()
                // 该快照将交由distributor处理
                p.distributor.SetSnapshot(groups.Default, s)
            }
}
// pkg/mcp/snapshot/snapshot.go
func (c *Cache) SetSnapshot(group string, snapshot Snapshot) {
    c.mu.Lock()
    defer c.mu.Unlock()

    // update the existing entry
    c.snapshots[group] = snapshot

    // trigger existing watches for which version changed
    if info, ok := c.status[group]; ok {
        info.mu.Lock()
        defer info.mu.Unlock()
        // 遍历所有的watches
        for id, watch := range info.watches {
            version := snapshot.Version(watch.request.Collection)
            if version != watch.request.VersionInfo {
                scope.Infof("SetSnapshot(): respond to watch %d for %v @ version %q",
                    id, watch.request.Collection, version)

                response := &source.WatchResponse{
                    Collection: watch.request.Collection,
                    Version:    version,
                    Resources:  snapshot.Resources(watch.request.Collection),
                    Request:    watch.request,
                }
                // 调用push方法
                // 将response放入到con.queue中 发送给client端
                watch.pushResponse(response)

                // discard the responseWatch
                delete(info.watches, id)

                scope.Debugf("SetSnapshot(): watch %d for %v @ version %q complete",
                    id, watch.request.Collection, version)
            }
        }
    }
}

info.watches是如何产生的呢?在snapshotwatch方法中会更新info.watch, 每次client发送ACK/NACK的时候都会更新info.watch. 所以当上游有事件产生的时候都会触发SetSnapshot进而向clientpush信息.

3. 总结

1. istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159

上一篇 下一篇

猜你喜欢

热点阅读