(三)gof 建立websocket连接

2021-03-07  本文已影响0人  公式般欢笑

本项目地址:gof 一个支持百万连接的websocket框架
本文提及的内容包含在:conn.go

一、新连接内容接收的流程

1、通过epoll对象获取新连接

我们需要通过epoll对象的wait方法来获取到客户端的连接,在第一次获取的时候,其内容是通过GlobalFd传输的,只有我们获取到连接句柄,并加入到epoll对象中之后,以后才可以接收到该连接自己发送的消息。

2、解析头消息内容

websocket连接在获取到连接之后,其首先发送的一定是头信息,如果没有头信息,我们可以直接关闭socket连接,拒绝客户端的访问。

3、回应客户端头消息

接收到客户端的header头信息之后,我们应该对其进行解析,并判断该header是否有用,并返回相应的数据,从而建立连接,进而通过连接去收发消息。

二、接收句柄消息

我们通过实例化EpollObj对象,可以得到一个Epoll模型,然后我们通过其中的wait方法可以接收句柄消息。
下面我们说一下第一种情况,也就是如果Epoll模型给我们的消息内容是GlobalFd传送过来的,这时候我们的整个接收流程。也就是我们上一章节中提到的,eWait方法中的handle

1、判断是否有新连接加入

// @Author WangKan
// @Description //当wait方法取到内容后,会回调此方法,对fd进行处理
// @Date 2021/2/2 21:39
func (s *Server) handler(fd int, connType ConnStatus) {
    switch connType {
    case CONN_NEW:
        newFd := s.addConn(fd)
        //Upgrader to http header
        s.handShaker(newFd)
        //s.messageChan<-newFd
    case CONN_MESSAGE:
        Log.Info("接收到描述符为%v的消息", fd)
        c, ok := s.conns.Load(fd)
        if !ok {
            Log.Info("描述符fd 为 %d 的s.conns 不存在!", fd)
            return
        }
        s.receiveFdBytes <- c.(*Conn)
    default:
        panic("no connType")
    }
}

我们只需要判断connType 就可以看到是新连接加入,还是之前的连接再次发送的消息。
如果是新连接的加入,我们首先应该做的处理就是从系统中读取到该连接的句柄,并取出其中的内容,进行解析。也就是 s.addConn方法。

// @Author WangKan
// @Description //如果有新的连接,就取出系统中的fd,添加到当前的conns中。
// @Date 2021/2/2 21:37
func (s *Server) addConn(fd int) (newFd int) {
    newFd, _, err := syscall.Accept(fd)
    fmt.Printf("系统描述符新建的链接:%+v\n", newFd)
    if err != nil {
        fmt.Println("accept err: ", err)
        return
    }
    //设置fd为非阻塞
    if err := syscall.SetNonblock(newFd, true); err != nil {
        os.Exit(1)
    }
    //把这个链接加入到epoll中
    s.ep.eAdd(newFd)
    return
}

在该方法中,传入的参数是GlobalFd,这个时候首先我们应该调用syscall.Accept方法找到新加入的句柄,然后将该句柄设置为非阻塞模式,从而添加到epoll模型中去进行管理。

2、读取新连接中的Header

当我们解析到了新的连接句柄之后,我们需要对该句柄中的内容进行解析,首先我们应该获取其中的内容:

// utils.go
// @Author WangKan
// @Description //获取客户端发送的Header头
// @Date 2021/2/24 15:13
// @Param
// @return
func GetHeaderContent(fd int)  (string,int) {
    for{
        var buf [1024]byte
        nbytes, _ := syscall.Read(fd, buf[:])
        if nbytes > 0 {
            return string(buf[:]),nbytes
        }
    }
}

通过调用syscall.Read()方法,我们可以将连接中的内容取出,取出之后是一个[]byte类型,然后我们转换为string,并将该内容返回给调用方。

3、为Header进行解析
在获取到Header头之后,我们需要解析header头的内容。在本项目中,是先将Header信息变成一个Map,然后遍历该map进行判断的。
具体代码为:

// @Author WangKan
// @Description //将Header头转换为map
// @Date 2021/2/24 15:13
// @Param
// @return
func FormatHeader(buf string,length int)(map[string]string){
    var header =make(map[string]string)
    str:=""
    index:=0
    for i:=0;i<length;i++{
        if buf[i]==13{
            if index == 0 {
                arr:=strings.Split(str," ")
                fmt.Println(arr)
                header["Method"]=arr[0]
                str=""
                index++
                continue
            }
            if str != ""{
                arr:=strings.Split(str,": ")
                header[arr[0]]=arr[1]
                str=""
            }
        }else if buf[i]==10{
            continue
        }else{
            str += string(buf[i])
        }
    }
    return header
}

通过上述的代码,我们最终返回给调用方一个字符串类型的map,然后,我们对其中的关键信息进行判断,判断的代码来源于:gorilla/websocket/server.go

func (u *Upgrader) Upgrade(fd int, header map[string]string, s *Server) (*Conn, error) {
    const badHandshake = "websocket: the client is not using the websocket protocol: "

    if header["Connection"] != "Upgrade" && header["Connection"] != "upgrade" {
        return u.returnError(http.StatusBadRequest, badHandshake+"'upgrade' token not found in 'Connection' header")
    }

    if header["Upgrade"] != "websocket" {
        return u.returnError(http.StatusBadRequest, badHandshake+"'websocket' token not found in 'Upgrade' header")
    }

    if header["Method"] != "GET" {
        return u.returnError(http.StatusMethodNotAllowed, badHandshake+"request method is not GET")
    }
    if header["Sec-WebSocket-Version"] != "13" {
        return u.returnError(http.StatusBadRequest, "websocket: unsupported version: 13 not found in 'Sec-Websocket-Version' header")
    }

    //if _, ok := header["Sec-WebSocket-Extensions"]; ok {
    //  return u.returnError(http.StatusInternalServerError, "websocket: application specific 'Sec-WebSocket-Extensions' headers are unsupported")
    //}

    challengeKey := header["Sec-WebSocket-Key"]
    if challengeKey == "" {
        return u.returnError(http.StatusBadRequest, "websocket: not a websocket handshake: 'Sec-WebSocket-Key' header is missing or blank")
    }
    c := newConn(fd, s)
    // Use larger of hijacked buffer and connection write buffer for header.
    wf := []byte{}
    wf = append(wf, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)

    wf = append(wf, computeAcceptKey(challengeKey)...)
    wf = append(wf, "\r\n"...)
    wf = append(wf, "\r\n"...)
    c.handShake <- Message{
        MessageType: -1,
        Content:     wf,
    }

    return c, nil
}

其中精简了一部分代码,是由于一些配置项我们还没有用到。
通过这个方法,我们最终返回的是wf,就是需要传送给客户端的头信息。
然后我们通过调用 syscall.Write()方法,就可以将这些内容写入对应的客户端句柄中,系统会将内容自动推送给客户端。

//server.go
n, err := syscall.Write(fd, heade.Content)
上一篇下一篇

猜你喜欢

热点阅读