自留地

k8s watch rest api

2017-09-27  本文已影响0人  ywhu

k8s rest api对rc、svc、ingress、pod、deployment等都提供的watch接口,可以实时的监听应用部署状态。

在此之前简单先说一下http长连接

分块传输编码(Chunked transfer encoding)

超文本传输协议(HTTP)中的一种数据传输机制,允许HTTP由应用服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在HTTP协议1.1版本(HTTP/1.1)中提供。
通常,HTTP应答消息中发送的数据是整个发送的,Content-Length消息头字段表示数据的长度。数据的长度很重要,因为客户端需要知道哪里是应答消息的结束,以及后续应答消息的开始。然而,使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。通常数据块的大小是一致的,但也不总是这种情况。

Transfer-Encoding

消息首部指明了将 entity 安全传递给用户所采用的编码形式。

Transfer-Encoding 是一个逐跳传输消息首部,即仅应用于两个节点之间的消息传递,而不是所请求的资源本身。一个多节点连接中的每一段都可以应用不同的Transfer-Encoding 值。如果你想要将压缩后的数据应用于整个连接,那么请使用端到端传输消息首部 Content-Encoding 。

当这个消息首部出现在 HEAD 请求的响应中,而这样的响应没有消息体,那么它其实指的是应用在相应的 GET 请求的应答的值。

Header type Response header
Forbidden header name   yes

语法

Transfer-Encoding: chunked
Transfer-Encoding: compress
Transfer-Encoding: deflate
Transfer-Encoding: gzip
Transfer-Encoding: identity

// Several values can be listed, separated by a comma
Transfer-Encoding: gzip, chunked

指令

chunked

数据以一系列分块的形式进行发送。 Content-Length 首部在这种情况下不被发送。。在每一个分块的开头需要添加当前分块的长度,以十六进制的形式表示,后面紧跟着 '\r\n' ,之后是分块本身,后面也是'\r\n' 。终止块是一个常规的分块,不同之处在于其长度为0。终止块后面是一个挂载(trailer),由一系列(或者为空)的实体消息首部构成。

compress

采用 Lempel-Ziv-Welch (LZW) 压缩算法。这个名称来自UNIX系统的 compress 程序,该程序实现了前述算法。
与其同名程序已经在大部分UNIX发行版中消失一样,这种内容编码方式已经被大部分浏览器弃用,部分因为专利问题(这项专利在2003年到期)。

deflate

采用 zlib 结构 (在 RFC 1950 中规定),和 deflate 压缩算法(在 RFC 1951 中规定)。

gzip

表示采用 Lempel-Ziv coding (LZ77) 压缩算法,以及32位CRC校验的编码方式。这个编码方式最初由 UNIX 平台上的 gzip 程序采用。处于兼容性的考虑, HTTP/1.1 标准提议支持这种编码方式的服务器应该识别作为别名的 x-gzip 指令。
identity
用于指代自身(例如:未经过压缩和修改)。除非特别指明,这个标记始终可以被接受。
示例

分块编码

分块编码主要应用于如下场景,即要传输大量的数据,但是在请求在没有被处理完之前响应的长度是无法获得的。例如,当需要用从数据库中查询获得的数据生成一个大的HTML表格的时候,或者需要传输大量的图片的时候。一个分块响应形式如下:

HTTP/1.1 200 OK 
Content-Type: text/plain 
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n 
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n 
\r\n

HTTP 1.1引入分块传输编码提供了以下几点好处:

一般情况HTTP的Header包含Content-Length域来指明报文体的长度。有时候服务生成HTTP回应是无法确定消息大小的,比如大文件的下载,或者后台需要复杂的逻辑才能全部处理页面的请求,这时用需要实时生成消息长度,服务器一般使用chunked编码

原理

k8s提供的watch功能是建立在对etcd的watch之上的,当etcd的key-value出现变化时,会通知kube-apiserver,这里的Key-vlaue其实就是k8s资源的持久化。

早期的k8s架构中,kube-apiserver、kube-controller-manager、kube-scheduler、kubelet、kube-proxy,都是直接去watch etcd的,这样就造成etcd的连接数太大(节点成千上万时),对etcd压力太大,浪费资源,因此到了后面,只有kube-apiserver去watch etcd,而kube-apiserver对外提供watch api,也就是kube-controller-manager、kube-scheduler、kubelet、kube-proxy去watch kube-apiserver,这样大大减小了etcd的压力

Watch API

通过k8s 官网 rest api的描述,可以看到,Watch API实际上一个标准的HTTP GET请求,我们以Pod的Watch API为例

HTTP Request

GET /api/v1/watch/namespaces/{namespace}/pods
Path Parameters

Parameter   Description
namespace   object name and auth scope, such as for teams and projects
Query Parameters

Parameter   Description
fieldSelector   A selector to restrict the list of returned objects by their fields. Defaults to everything.
labelSelector   A selector to restrict the list of returned objects by their labels. Defaults to everything.
pretty  If ‘true’, then the output is pretty printed.
resourceVersion When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history. When specified for list: - if unset, then the result is returned from remote storage based on quorum-read flag; - if it’s 0, then we simply return what we currently have in cache, no guarantee; - if set to non zero, then the result is at least as fresh as given rv.
timeoutSeconds  Timeout for the list/watch call.
watch   Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion.
Response

Code    Description
200 WatchEvent  OK

从上面可以看出Watch其实就是一个GET请求,和一般请求不同的是,它有一个watch的query parameter,也就是kube-apiserver接到这个请求,当发现query parameter里面包含watch,就知道这是一个Watch API,watch参数默认为true。

==返回值是200和WatchEvent。apiserver首先会返回一个200的状态码,建立长连接,然后不断的返回watch event==

服务器端机制

通过watch api涉及到的http源码分析,可以看到,watch支持http长连接和websocket两种方式

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    w = httplog.Unlogged(w)

    if wsstream.IsWebSocketRequest(req) {
        w.Header().Set("Content-Type", s.MediaType)
        websocket.Handler(s.HandleWS).ServeHTTP(w, req)
        return
    }
    ...
    framer := s.Framer.NewFrameWriter(w)
    ...
    e := streaming.NewEncoder(framer, s.Encoder)
    ...
    // begin the stream
    w.Header().Set("Content-Type", s.MediaType)
    w.Header().Set("Transfer-Encoding", "chunked")
    w.WriteHeader(http.StatusOK)
    flusher.Flush()

    var unknown runtime.Unknown
    internalEvent := &metav1.InternalEvent{}
    buf := &bytes.Buffer{}
    ch := s.Watching.ResultChan()
    for {
        select {
        case <-cn.CloseNotify():
            return
        case <-timeoutCh:
            return
        case event, ok := <-ch:
            if !ok {
                // End of results.
                return
            }

            obj := event.Object
            s.Fixup(obj)
            if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
                // unexpected error
                utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
                return
            }

            // ContentType is not required here because we are defaulting to the serializer
            // type
            unknown.Raw = buf.Bytes()
            event.Object = &unknown

            // the internal event will be versioned by the encoder
            *internalEvent = metav1.InternalEvent(event)
            if err := e.Encode(internalEvent); err != nil {
                utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
                // client disconnect.
                return
            }
            if len(ch) == 0 {
                flusher.Flush()
            }

            buf.Reset()
        }
    }
}

每次调用watch API,kube-apiserver都会建立一个WatchServer,WatchServer通过channel会从etcd里面获取资源的watch event,中间经过一系列的处理(事件的广播)。然后WatchServer通过ServeHTTP将事件发送给client,我们就详细看看ServerHTTP的处理逻辑

首先会查看发送来的请求是不是要求使用websockt,即wsstream.IsWebSocketRequest(req),假如是的话就通过websocket向client发送watch event,也就是说kube-apiserver是支持通过websocket向客户端发送watch event的。

假如不是的话,则首先设置http返回头,Content-Type设置为s.MediaType,一般为json,同时设置Transfer-Encoding为chunked,设置返回码为200(StatusOK),和我们从API分析那一节获取的信息一样,首先会返回一个200的状态吗。

这里比较有意思的是,将Transfer-Encoding设置为chunked,这是http1.1中支持的协议,它会建立一个长连接,同时可以不停的发送数据块,发送数据块的格式是,它首先会发送一个数据块的长度,加上回车符(/r/n),接着发送相应的数据块内容。假如数据块长度为0,则代表数据发送完成,连接断开。显然这里的watch event就是一个个数据块。

w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)

看看接下里的for循环,首先从channel里面获取event

event, ok := <-ch

然后序列化数据

obj := event.Object
s.EmbeddedEncoder.Encode(obj, buf)
unknown.Raw = buf.Bytes()
event.Object = &unknown

最后将数据发送出去

*internalEvent = metav1.InternalEvent(event)
e.Encode(internalEvent)

具体实现

考虑的http长连接的资源消耗与性能问题,实现pod监听是采用的ws协议

同时在服务多实例时涉及到pod状态合并的问题

例如:通过deployment部署3个pod示例,在3个pod都起来时,deployment才算正常,在3个pod都删除是,deployment才算下线。并且一些java应用启动的时间可能较长,采用livenessprobe探针健康检查是需要探测多次才可探测到启动成功,这个会收到多条modify的消息,诸如此类的情况都要考虑,可根据监控需求具体实现

//定义自己msg消息结构体
type msg struct {
    
}

//存储pod状态
var cache *CacheStatus

var msgChan = make(chan msg, 1024)

//ws协议监听
func WatchPod() {
    
    //ReconnWs为自己封装websocket工具包
    reconnWs := new(ReconnWs)

    u, _ := url.Parse("ws://" + k8surl + "/api/v1/watch/pods?watch=true&pretty=true")

    reconnWs.Dial(u, http.Header{
        "Origin": []string{"http://" + k8surl + "/"},
    })

    done := make(chan struct{})

    //监听信息处理函数
    go func() {
        for {
            m := <-msgChan
            //消息处理函数
            go handler()
        }
    }()

    //pod监听
    go func() {

        if cache == nil {
            cache = NewCache()
        }

        defer func() {
            reconnWs.Close()
            close(done)
        }()
        for {

            var event PodStatus

            _, message, err := reconnWs.ReadMessage()
            if err != nil {
                log.Println("read:", err)
                continue
            }

            if err := jsoniter.Unmarshal(message, &event); err != nil {
                log.Println(err)
                continue
            }

            switch event.Type {

            case Added:
            

            case Deleted:

                

            case Modified:

                

            case Error:


            }

            m := msg{
                //填入获取的状态信息
            }

            msgChan <- m

        }
    }()

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    for {

        defer func() {
            Close()
        }()

        select {
        case <-interrupt:
            select {
            case <-done:
            case <-time.After(time.Second):
            }
            return
        }
    }
}


//资源回收
func Close() {
    
}

cache采用的是map结构存储pod状态,getStatuInfo()函数是监控程序启动的时候初始化服务状态数据的函数,可以选择每次启动都从k8s查询一遍,根据自己定义的监听规则填入pod状态,也可以在每次接受到消息时都存入持久化存储(mysql、redis等),每次启动再从持久化存储中查询以及初始化已存在的cache数据

type CacheStatus struct {
    lock *sync.RWMutex
    bm   map[string]Status
}

type Status struct {
    ReadyReplicas    int
    Replicas         int
    Event            string
    Phase            string
    Errmsg           string
    CreatingReplicas int
}

func NewCache() *CacheStatus {

    return &CacheStatus{
        lock: new(sync.RWMutex),
        bm:   getStatuInfo(),
    }

}

func (m *CacheStatus) IsExist(k string) (isExist bool) {

    m.lock.Lock()
    defer m.lock.Unlock()

    if _, ok := m.bm[k]; ok {
        isExist = true
    }

    return
}

func (m *CacheStatus) Get(k string) (status Status) {
    m.lock.RLock()
    defer m.lock.RUnlock()
    if status, ok := m.bm[k]; ok {
        return status
    }
    return
}

func (m *CacheStatus) Set(k string, v Status) {
    m.lock.Lock()
    defer m.lock.Unlock()

    m.bm[k] = v

}

func (m *CacheStatus) Check(k string) bool {
    m.lock.RLock()
    defer m.lock.RUnlock()
    if _, ok := m.bm[k]; !ok {
        return false
    }
    return true
}
func (m *CacheStatus) Delete(k string) {
    m.lock.Lock()
    defer m.lock.Unlock()
    delete(m.bm, k)
}

//range map
func (m *CacheStatus) Each(cb func(string, Status)) {
    m.lock.RLock()
    defer m.lock.RUnlock()
    for k, v := range m.bm {
        cb(k, v)
    }
}

func (m *CacheStatus) String() string {
    str, _ := jsoniter.MarshalToString(m.bm)
    return str
}


func getStatuInfo() (cacheInfo map[string]Status) {

    ....
    
    return
}

上一篇下一篇

猜你喜欢

热点阅读