etcdGo

golang http2长链接问题

2020-06-28  本文已影响0人  凹大猫的鱼

前几天项目中遇到了一个长链接假死问题,服务端和client端采用的是h2c长连接。服务端作为sidecar部署在k8s的pod里面,当滚动升级pod的时候,client端和老的pod的连接一直存在,即使老的pod已经被删除了。(client和网关是一个东西)

发现问题

突然有一天前端同事说调用全部503(内部服务不可用),赶紧去环境上查看log,发现网关发送到后段的请求全部超时。用netstat查看连接状态也是没问题的。

网关端状态图

去新建的pod上查看,发现并没有连接,因此网关端的连接并不是和新建的pod建立的。

服务端状态图

解决问题

1.查看网关端代码

首先看一下网关测的代码:

    func NewH2cClient() *http.Client {
    client := &http.Client{
        Transport: &http2.Transport{
            AllowHTTP: true,
            DialTLS:   dialH2cTimeout,
        },
        Timeout: 10 * time.Second,
        }
        return client
    }

    func dialH2cTimeout(network, addr string, cfg *tls.Config) (net.Conn, error) {
       tylog.Infof("connect to remote address: %+v:", addr)
       return net.Dial(network, addr)
    }

client端的实现很简单,复用http.Client对象。在这里设置了TimeOut为10s,这个是发送超时时间。通过log也可以看到10s超时后返回了error,但连接并没有终端,查阅了一些net/http的文档服务端对于超时的控制还是比较细腻的。

服务端超时控制

但client端的控制就比较简陋(其实应该是net/http封装的东西太多,导致用户可直接操作的东西太少)

client端超时控制

从图中可以看出,client端不像服务端有读写超时设置,client端就一个Timeout,是从建立链接到接收到body的时间段。因此我们在client端可以做的东西貌似并不多。

2.回归问题

查阅了部分资料后,回到问题的所在点:

为什么服务端已经被删除,但长链接还存在??

因此我们需要看的就是client端的conn什么时候应该断开。http包中的读和写是分开的,分别对应的是两个goroutine,我们主要应该看的是读操作。

      func (cc *ClientConn) readLoop() {
         rl := &clientConnReadLoop{cc: cc}
         defer rl.cleanup() //这个会清理链接
         cc.readerErr = rl.run()
         if ce, ok := cc.readerErr.(ConnectionError); ok {
              cc.wmu.Lock()
              cc.fr.WriteGoAway(0, ErrCode(ce), nil)
              cc.wmu.Unlock()
         }
     }

上述代码是read的入口,我们看一下run这个函数的实现

func (rl *clientConnReadLoop) run() error {
    cc := rl.cc
    rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
    gotReply := false // ever saw a HEADERS reply
    gotSettings := false
    for {
        f, err := cc.fr.ReadFrame()
        if err != nil {
            cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
        }
        if se, ok := err.(StreamError); ok {
            if cs := cc.streamByID(se.StreamID, false); cs != nil {
                cs.cc.writeStreamReset(cs.ID, se.Code, err)
                cs.cc.forgetStreamID(cs.ID)
                if se.Cause == nil {
                    se.Cause = cc.fr.errDetail
                }
                rl.endStreamError(cs, se)
            }
            continue
        } else if err != nil {
            return err
        }
        if VerboseLogs {
            cc.vlogf("http2: Transport received %s", summarizeFrame(f))
        }
        if !gotSettings {
            if _, ok := f.(*SettingsFrame); !ok {
                cc.logf("protocol error: received %T before a SETTINGS frame", f)
                return ConnectionError(ErrCodeProtocol)
            }
            gotSettings = true
        }
        maybeIdle := false // whether frame might transition us to idle

        switch f := f.(type) {
        case *MetaHeadersFrame:
            err = rl.processHeaders(f)
            maybeIdle = true
            gotReply = true
        case *DataFrame:
            err = rl.processData(f)
            maybeIdle = true
        case *GoAwayFrame:
            err = rl.processGoAway(f)
            maybeIdle = true
        case *RSTStreamFrame:
            err = rl.processResetStream(f)
            maybeIdle = true
        case *SettingsFrame:
            err = rl.processSettings(f)
        case *PushPromiseFrame:
            err = rl.processPushPromise(f)
        case *WindowUpdateFrame:
            err = rl.processWindowUpdate(f)
        case *PingFrame:
            err = rl.processPing(f)
        default:
            cc.logf("Transport: unhandled response frame type %T", f)
        }
        if err != nil {
            if VerboseLogs {
                cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
            }
            return err
        }
        if rl.closeWhenIdle && gotReply && maybeIdle {
            cc.closeIfIdle()
        }
    }
}

再看一下ReadFrame函数:

// ReadFrame reads a single frame. The returned Frame is only valid
// until the next call to ReadFrame.
//
// If the frame is larger than previously set with SetMaxReadFrameSize, the
// returned error is ErrFrameTooLarge. Other errors may be of type
// ConnectionError, StreamError, or anything else from the underlying
// reader.
func (fr *Framer) ReadFrame() (Frame, error) {
    fr.errDetail = nil
    if fr.lastFrame != nil {
        fr.lastFrame.invalidate()
    }
    fh, err := readFrameHeader(fr.headerBuf[:], fr.r)
    if err != nil {
        return nil, err
    }
    if fh.Length > fr.maxReadSize {
        return nil, ErrFrameTooLarge
    }
    payload := fr.getReadBuf(fh.Length)
    if _, err := io.ReadFull(fr.r, payload); err != nil {
        return nil, err
    }
    f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
    if err != nil {
        if ce, ok := err.(connError); ok {
            return nil, fr.connError(ce.Code, ce.Reason)
        }
        return nil, err
    }
    if err := fr.checkFrameOrder(f); err != nil {
        return nil, err
    }
    if fr.logReads {
        fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
    }
    if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
        return fr.readMetaFrame(f.(*HeadersFrame))
    }
    return f, nil
}

其实就是一直阻塞读取数据,只有在收到reset或者fin包(EOF)的时候才会退出,然后会执行前面的清理函数:

 defer rl.cleanup()

该函数会调用MarkDead函数清理conn关联的map

func (p *clientConnPool) MarkDead(cc *ClientConn) {
    p.mu.Lock()
    defer p.mu.Unlock()
    for _, key := range p.keys[cc] {
        vv, ok := p.conns[key]
        if !ok {
            continue
        }
        newList := filterOutClientConn(vv, cc)
        if len(newList) > 0 {
            p.conns[key] = newList
        } else {
            delete(p.conns, key)
        }
    }
    delete(p.keys, cc)
}

看到这基本就知道问题的所在了:

  1. 删除pod的时候服务端没有发送Fin包
  2. 服务端发送了Fin包,但网关没收到

3.验证问题所在

使用了tcpdump抓包,发现client端的确没有收到Fin包

client端抓包

从抓包信息看的确没有收到Fin包,服务端抓也没抓到,于是感觉像k8s升级的时候直接删掉了老的pod(类似直接delete),并没有发送kill命令。但集群权限是拿不到的,不太好认证这个猜想。

4.解决方案

因为服务端也是自己写的,因此可以设置deployment来设置pod更新时的动作,比如:

"lifecycle": {
              "preStop": {
                "httpGet": {
                  "path": "/hc.do?a=offline",
                  "scheme": "HTTP",
                  "port": 8809
                }
              }
            },

让服务监听8809端口,k8s在删除pod前会访问 http127.0.0.1:8809/hc.do?a=offline, 这时候在服务里面做优雅下线,关掉tcp链接,这个问题就不会发生了。

5.总结

net/http包虽然很容易实现http服务,但对于client端的确不够友好,至少我没找到什么方式可以直接去控制超时时间,或者管理conn连接池,当然我也可以在它基础上实现一个自己的连接池(ClientConnPool接口)但成本不低。所以很多人都会自己去实现。

对于pod删除时信号量的问题,因为没有权限,所以没法看pod滚动升级时,kublet是怎么删除老的节点的,按照我的理解应该是先给容器发kill -15, 如果没退出再kill -9, 但就目前情况来看服务端并没有捕捉到信号量(我加了捕捉信号量,但并未打印出来log),后面有机会再去深入研究一下k8s的那块代码。

上一篇下一篇

猜你喜欢

热点阅读