k8s watch源码简介
简单总结
通过Chunked transfer encoding来做到服务端给客户端推送数据,达到watch的效果
client发送watch这个请求后,服务端返回响应,带上Header Transfer-Encoding:chunked,客户端和服务端的连接仍会保持,接着服务端就可以推送相应的事件给客户端了
server的resp内的writer是经过chunkwriter包装的,如果是chunk模式,会在数据前添加数据长度加\r\n(例如发送hello,那么实际发送是5\r\nhello\r\n)
客户端读取chunkline,也就是5\r\n,知道长度是5后,读取长度+2的数据,也就是hello\r\n,实际数据是hello
顺道提一下informer,拿pod举例
Informer 先通过Reflector先List 获得所有的 Pods
Reflect 拿到全部 Pod 放到 Store 中(如果调用Informer的List则从Store中取)
List完后,Reflector 开始 Watch Pod相关 的所有事件
收到后Reflector将事件发送到 DeltaFIFO,
Controller 收到这个事件,会触发 Processor 的回调函数,先记录到一个clientState的cache结构中,然后调用用户传入的ResourceEventHandler
reflector会周期性的resync,从clientState合规cache结构取出然后塞入DeltaFIFO
相关源码
拿pod watch举例
client-go相对路径
kubernetes/typed/core/v1/pod.go中
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}
rest/request.go中
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
...
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}
...
}
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
}
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, err
}
handleWarnings(resp.Header, r.warningHandler)
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
}
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
if r.err != nil {
return r
}
params, err := codec.EncodeParameters(obj, version)
if err != nil {
r.err = err
return r
}
for k, v := range params {
if r.params == nil {
r.params = make(url.Values)
}
r.params[k] = append(r.params[k], v...)
}
return r
}
kubernetes相对路径
vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go中
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
...
if opts.Watch || forceWatch {
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
result, err := r.List(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
...
}
vendor/k8s.io/apiserver/pkg/endpoints/handlers/watch.go中
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
...
server := &WatchServer{
Watching: watcher,
Scope: scope,
UseTextFraming: useTextFraming,
MediaType: mediaType,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
}
server.ServeHTTP(w, req)
...
}
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
...
w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
for {
select {
case <-done:
return
case <-timeoutCh:
return
case event, ok := <-ch:
...
}
]
...
}
标准库相对路径
net/http/transfer.go中
func readTransfer(msg any, r *bufio.Reader) (err error) {
...
switch {
case t.Chunked:
if noResponseBodyExpected(t.RequestMethod) || !bodyAllowedForStatus(t.StatusCode) {
t.Body = NoBody
} else {
t.Body = &body{src: internal.NewChunkedReader(r), hdr: msg, r: r, closing: t.Close}
}
}
...
}
net/http/server.go中
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
...
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose: req.wantsClose(),
}
if isH2Upgrade {
w.closeAfterReply = true
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
...
}
func (w *response) Write(data []byte) (n int, err error) {
return w.write(len(data), data, "")
}
func (w *response) WriteString(data string) (n int, err error) {
return w.write(len(data), nil, data)
}
// either dataB or dataS is non-zero.
func (w *response) write(lenData int, dataB []byte, dataS string) (n int, err error) {
if w.conn.hijacked() {
if lenData > 0 {
caller := relevantCaller()
w.conn.server.logf("http: response.Write on hijacked connection from %s (%s:%d)", caller.Function, path.Base(caller.File), caller.Line)
}
return 0, ErrHijacked
}
if w.canWriteContinue.isSet() {
// Body reader wants to write 100 Continue but hasn't yet.
// Tell it not to. The store must be done while holding the lock
// because the lock makes sure that there is not an active write
// this very moment.
w.writeContinueMu.Lock()
w.canWriteContinue.setFalse()
w.writeContinueMu.Unlock()
}
if !w.wroteHeader {
w.WriteHeader(StatusOK)
}
if lenData == 0 {
return 0, nil
}
if !w.bodyAllowed() {
return 0, ErrBodyNotAllowed
}
w.written += int64(lenData) // ignoring errors, for errorKludge
if w.contentLength != -1 && w.written > w.contentLength {
return 0, ErrContentLength
}
if dataB != nil {
return w.w.Write(dataB)
} else {
return w.w.WriteString(dataS)
}
}
func (cw *chunkWriter) Write(p []byte) (n int, err error) {
if !cw.wroteHeader {
cw.writeHeader(p)
}
if cw.res.req.Method == "HEAD" {
// Eat writes.
return len(p), nil
}
if cw.chunking {
_, err = fmt.Fprintf(cw.res.conn.bufw, "%x\r\n", len(p))
if err != nil {
cw.res.conn.rwc.Close()
return
}
}
n, err = cw.res.conn.bufw.Write(p)
if cw.chunking && err == nil {
_, err = cw.res.conn.bufw.Write(crlf)
}
if err != nil {
cw.res.conn.rwc.Close()
}
return
}
net/http/internal/chunked.go中
func (cr *chunkedReader) Read(b []uint8) (n int, err error) {
for cr.err == nil {
if cr.checkEnd {
if n > 0 && cr.r.Buffered() < 2 {
// We have some data. Return early (per the io.Reader
// contract) instead of potentially blocking while
// reading more.
break
}
if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err == nil {
if string(cr.buf[:]) != "\r\n" {
cr.err = errors.New("malformed chunked encoding")
break
}
} else {
if cr.err == io.EOF {
cr.err = io.ErrUnexpectedEOF
}
break
}
cr.checkEnd = false
}
if cr.n == 0 {
if n > 0 && !cr.chunkHeaderAvailable() {
// We've read enough. Don't potentially block
// reading a new chunk header.
break
}
cr.beginChunk()
continue
}
if len(b) == 0 {
break
}
rbuf := b
if uint64(len(rbuf)) > cr.n {
rbuf = rbuf[:cr.n]
}
var n0 int
n0, cr.err = cr.r.Read(rbuf)
n += n0
b = b[n0:]
cr.n -= uint64(n0)
// If we're at the end of a chunk, read the next two
// bytes to verify they are "\r\n".
if cr.n == 0 && cr.err == nil {
cr.checkEnd = true
} else if cr.err == io.EOF {
cr.err = io.ErrUnexpectedEOF
}
}
return n, cr.err
}