WebSocket 的实现

2020-08-23  本文已影响0人  空腹无才

网络协议的

长连接: 一个链接上可以连续发送多个数据包,在链接期间,如果没有数据包发送,需要双方发链路检查包

TCP/IP: TCP/IP 属于传输层,主要解决网络中的数据传输问题,只管传输数据。但这样对传输的数据没有一个规范的封装、解析等处理。使得传输的数据难以识别,所以才有了应用层协议对数据进行的封装、解析等,如http协议。

HTTP: HTTP协议是应用层协议,用于分装解析传输数据。 从HTTP1.1开始其实就默认开启了长链接,也就是请求头header中可以看到Connection:Keep-alive。但是长连接只是说保持了(服务器可以告诉客户端保持时间Keep-Alive:timeout=20;max=20;)这个TCP通道,并采用服务器和客户端应答模式(Request-Response),不需要再创建一个链接通道,做到一个性能优化。

socket: 与HTTP协议不一样,socket不是协议,他是在程序层面上对传输层协议(像TCP/IP)的接口封装。我们知道传输层的协议,是解决数据在网络中传输的问题的,那么socket(套接字)就是传输通道两端的接口。

Websocket: WebSocket是包装成了一个应用层协议作为socket,从而能够让客户端和远程服务端通过web建立全双工通信。

WebSocket API (客户端)

WebSocket API 是HTML5 推出的东西。在客户端我们可以通过HTML5 所提供的API 对websocket 进行创建、发送数据、监听信息、监听报错等功能(HTML5 WebSocket)

if("WebSocket" in window) {
    //创建WebSocket实例,可以使用ws和wss。第二个参数可以选填自定义协议,如果多协议,可以以数组方式
    let ws = new WebSocket("ws://127.0.0.1:8091");
    
    // 用于指定连接成功后的回调函数。
    ws.onopen = (event) => {
        console.log("WebSocket 链接成功");
    }
    
    // 用于指定当从服务器接受到信息时的回调函数。
    ws.onmessage = (data) => {
        console.log(data);
    }
    
    // 用于指定连接关闭后的回调函数。
    ws.onerror = (e) => {
        console.error("WebSocket error observed:", e);
    }
    
    //....
else {
    alert("该浏览器不支持 WebSocket")
}

WebSocket (Node 服务端)

我们知道WebSocket 是在Socket的基础上实现的,所以我们要做的是对现有的Socket协议进行升级。

步骤: 客户端发送websocket请求-->服务端接受并识别该请求-->对该请求协议进行升级--> 返回给客户端 --> websocket 通道建立 --> 客户端/服务端发送数据

协议升级

// 创建websocket
const http = require("http");
const crypto = require("crypto");

const MAGIC_STRINC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // 固定值
const port = 8091;

let server = http.createServer();

// http服务器部分
server.on("request", (req, res) => {
    res.end("websocket test");
})

// upgrade 请求(升级通讯协议)
server.on("upgrade", (req, socket, head) => {
    // 加密 Sec-Websocket-Accept 值
    const swa = crypto.createHash('sha1').update(req.headers['sec-websocket-key'] + MAGIC_STRINC).digest('base64');
    
    // 构造响应头部
    let resHeaders = ([
        'HTTP/1.1 101 Switching Protocols', //必需。响应头。状态码为101。任何非101的响应都为握手未完成。但是HTTP语义是存在的。
        'Upgrade: websocket', // 必需。升级类型。
        'Connection: Upgrade', //必需。本次连接类型为升级。
        `Sec-Websocket-Accept: ${swa}` //必需。表明服务器是否愿意接受连接。如果接受,值就必须是通过上面算法得到的值。
    ]).concat('','').join('\r\n');
    
    // 返回升级协议信息  完成WebSocket通道建立
    socket.write(resHeaders);
})

// 启动服务器
server.listen(port, ()=> {
    let dateTime = 
        (new Date()).getFullYear() +
        "-" + ((new Date()).getMonth() + 1) +
        "-" + (new Date()).getDate() +
        " " + (new Date()).getHours() +
        ":" + (new Date()).getMinutes() +
        ":" + (new Date()).getSeconds();
    console.log(`${dateTime} server start success: 127.0.0.1:${port}`)
})

在这里需要注意的是头部信息和头部信息中的Sec-Websocket-Accept的值。

该值需要是一个通过base64加密的哈希值(sha1)。 而该加密所用的数据是客户端传过来的sec-websocket-key的值和MAGIC_STRINC内的固定值。 对MAGIC_STRINC的说明

WebSocket 数据传输

Webscoket 中传输的数据是数据帧(frame)

数据帧有多种类型 主要有:文本型、二进制数据

数据帧结构

68747470733a2f2f692e696d6775722e636f6d2f79506350784a332e706e67.png

每一列代表一个字节,一个字节8位,每一位又代表一个二进制数。

    1 为最后一帧
    0 不是最后一帧。需要分为多个帧传输
扩展字段,除非一个扩展经过协商赋予了非零值的某种含义,否则必须为0
解释 payload data 的类型,如果收到识别不了的opcode,直接断开。
分类值如下:
%x0:连续的帧 
%x1:text帧 
%x2:binary帧 
%x3 - 7:为非控制帧而预留的 
%x8:关闭握手帧 
%x9:ping帧 
%xA:pong帧 
%xB - F:为非控制帧而预留的
    1 客户端发送数据到服务端
    0 服务端发送数据到客户端
    0-125,则是payload的真实长度
    126,则后面2个字节形成的16位无符号整型数的值是payload的真实长度,125<数据长度<65535
    127,则后面8个字节形成的64位无符号整型数的值是payload的真实长度,数据长度>65535

创建数据帧

// 创建数据帧
function createDate(data) {
    let dataType = Buffer.isBuffer(data); // 判断数据是否是buffer类型
    let dataBuf, // 需要发送的二进制数据
        dataLength, // 数据真实长度
        dataIndex = 2; // 数据的起始位置
    let frame; // 用来存储封装好的数据帧

    if(dataType) {
        dataBuf = data;
    } else {
        dataBuf = Buffer.from(data);
    }

    dataLength = dataBuf.byteLength;

    // 计算payload data在frame中的起始位置
    dataIndex = dataIndex + (dataLength > 65535 ? 8 : (dataLength > 125 ? 2 : 0));
    // 创建多大空间
    frame = new Buffer.alloc(dataIndex + dataLength);

    // 第一个字节, fin = 1, opcode = 1
    frame[0] = parseInt(10000001, 2);

    // 长度超过65535 的由8个字节表示
    if(dataLength > 65535) {
        frame[1] = 127; // 第二个字节
        frame.writeUInt32BE(0, 2); // (值, 写入之前要跳过的位置)
        frame.writeUInt32BE(dataLength, 6);
    } else if (dataLength > 125) {
        frame[1] = 126;
        frame.writeUInt16BE(dataLength, 2);
    } else {
        frame[1] = dataLength;
    }

    // 发送给您客户端的数据
    frame.write(dataBuf.toString(), dataIndex); // (数据, 数据写入到buffer的位置)

    // this.write() == socket.write()  发送数据给客户端
    this.write(frame);

}

解数据帧

// 获取客户端数据状态
function getHandleDateState(data) {
        let dataIndex = 2; // 数据索引 应为第一个字节和第二个字节肯定不是数据,所以数据从初始值2开始
        let secondeByte = data[1]; // 代表masked位和可能是payloadLength位的第二个字节
        let hasMask = secondeByte >= 128; // 如果大于大于或等一128, 说明masked为1

        let dataLength, maskedData;

        // 如果数据为126, 则后面16位长度为数据位,如果为127 则后面64位长度的数据为数据长度
        if(secondeByte == 126) {
            dataIndex += 2;
            dataLength = data.readUInt16BE(2);

        } else if(secondeByte == 127) {
            dataIndex += 8;
            dataLength = data.readUInt32BE(2) + data.readUInt32BE(6);
        } else {
            dataLength = secondeByte;
        }

        // 如果有掩码, 则获取32位的二进制masking key, 同时更新index
        if(hasMask) {
            maskedData = data.slice(dataIndex, dataIndex + 4);
            dataIndex += 4;
        }

        // 数据量最大位10kb
        if(dataLength > 10240) {
            this.send("warning : data limit 10kb");
        } else {
            // dataIndex 位数据位的起始位置, datalength 位数据长度, maskedData 为二进制的解密数据
            this.stat = {
                index: dataIndex,
                totalLength: dataLength,
                length: dataLength,
                maskedData: maskedData,
                opcode: parseInt(data[0].toString(16).split("")[1], 16),
            }
        }
}



// 解数据帧
function decode(data, key) {
    getHandleDateState.apply(this, [data]);
    let stat;
    this.datas;

    if(!(stat = this.stat)) return;

    // 如果opcode为9, 则发送pong响应, 如果opcode为10则置pingtimes 为0
    if(stat.opcode === 9 || stat === 10) {
        (stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0);
        this.reset();
        return;
    } 
    else if(stat.opcode === 8) {
        console.log(key)
        socketMap.delete(key);
        this.end();
        return;
    }

    let result;
    if (stat.maskedData) {
        result = Buffer.alloc(data.length-stat.index);
        for (var i = stat.index, j = 0; i < data.length; i++, j++) {
            //对每个字节进行异或运算,masked是4个字节,所以%4,借此循环
            result[j] = data[i] ^ stat.maskedData[j % 4];
        }
    } else {
        result = data.slice(stat.index, data.length);
    }

    this.datasd = result;
    stat.length -= (data.length - stat.index);
    //当长度为0,说明当前帧为最后帧
    if (stat.length == 0) {
        var buf = Buffer.concat(this.datas, stat.totalLength);
        console.log(stat.opcode)
        if (stat.opcode == 8) {
            this.close(buf.toString());
        } else {
            this.emit("message", buf.toString());
        }

        this.reset();
    }
    // 打印客户端信息
    console.log(this.datasd.toString())
}

心跳检查

由于websocket 不进行交互会关闭通道所以,才有了心跳检查。

// 心跳检测
function checkHeartBeat (index) {
    this.pingTimes = 0; // 记录心跳次数

    let c =  setInterval(() => {
       if(this.pingTimes > 4) {
           this.end();
           socketMap.delete(index);
           clearInterval(c);
       } else {
           sendPring.apply(this, []);
           this.pingTimes++;
       }
    }, 5000);
}

// 发送心跳
function sendPring() {
    this.write(Buffer.from(['0x89', '0x0']))
}

完整代码(服务端)

// 创建websocket
const http = require("http");
const crypto = require("crypto")

const port = 8091;
const MAGIC_STRINC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // 固定值
let socketMap = new Map();
let count = 0;

let server = http.createServer();


// http服务器部分
server.on("request", (req, res) => {
    res.end("websocket test");
})

// upgrade 请求(升级通讯协议)
server.on("upgrade", (req, socket, head) => {
    // 加密 Sec-Websocket-Accept 值
    const swa = crypto.createHash('sha1').update(req.headers['sec-websocket-key'] + MAGIC_STRINC).digest('base64');

    // 构造响应头部
    let resHeaders = ([
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        `Sec-Websocket-Accept: ${swa}`
    ]).concat('','').join('\r\n');

    socket.write(resHeaders);
    crateSocketMap(socket);
})

// socket Map
function crateSocketMap (socket) {
    let index = count++;
    socketMap.set(index, socket);

    let number = parseInt(Math.random() * 10); 
    let c = setTimeout(() => {
        if(socketMap.get(index)) {
            createDate.call(socket, number.toString());
            number++;
        } else {
            clearInterval(c)
        }
    }, 2000);

    socket.on("data", (data) => {
        decode.apply(socket, [data, index]);
    })

    socket.on("error", (err) => {
        console.log(err)
    })

    checkHeartBeat.apply(socket, [index]);
}

// 心跳检测
function checkHeartBeat (index) {
    this.pingTimes = 0; // 记录心跳次数

    let c =  setInterval(() => {
       if(this.pingTimes > 4) {
           this.end();
           socketMap.delete(index);
           clearInterval(c);
       } else {
           sendPring.apply(this, []);
           this.pingTimes++;
       }
    }, 5000);
}

// 发送心跳
function sendPring() {
    this.write(Buffer.from(['0x89', '0x0']))
}




// 解数据帧
function decode(data, key) {
    getHandleDateState.apply(this, [data]);
    let stat;
    this.datas;

    if(!(stat = this.stat)) return;

    // 如果opcode为9, 则发送pong响应, 如果opcode为10则置pingtimes 为0
    if(stat.opcode === 9 || stat === 10) {
        (stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0);
        this.reset();
        return;
    } 
    else if(stat.opcode === 8) {
        console.log(key)
        socketMap.delete(key);
        this.end();
        return;
    }

    let result;
    if (stat.maskedData) {
        result = Buffer.alloc(data.length-stat.index);
        for (var i = stat.index, j = 0; i < data.length; i++, j++) {
            //对每个字节进行异或运算,masked是4个字节,所以%4,借此循环
            result[j] = data[i] ^ stat.maskedData[j % 4];
        }
    } else {
        result = data.slice(stat.index, data.length);
    }

    this.datasd = result;
    stat.length -= (data.length - stat.index);
    //当长度为0,说明当前帧为最后帧
    if (stat.length == 0) {
        var buf = Buffer.concat(this.datas, stat.totalLength);
        console.log(stat.opcode)
        if (stat.opcode == 8) {
            this.close(buf.toString());
        } else {
            this.emit("message", buf.toString());
        }

        this.reset();
    }
    // 打印客户端信息
    console.log(this.datasd.toString())
}


// 获取客户端数据状态
function getHandleDateState(data) {
        let dataIndex = 2; // 数据索引 应为第一个字节和第二个字节肯定不是数据,所以数据从初始值2开始
        let secondeByte = data[1]; // 代表masked位和可能是payloadLength位的第二个字节
        let hasMask = secondeByte >= 128; // 如果大于大于或等一128, 说明masked为1

        let dataLength, maskedData;

        // 如果数据为126, 则后面16位长度为数据位,如果为127 则后面64位长度的数据为数据长度
        if(secondeByte == 126) {
            dataIndex += 2;
            dataLength = data.readUInt16BE(2);

        } else if(secondeByte == 127) {
            dataIndex += 8;
            dataLength = data.readUInt32BE(2) + data.readUInt32BE(6);
        } else {
            dataLength = secondeByte;
        }

        // 如果有掩码, 则获取32位的二进制masking key, 同时更新index
        if(hasMask) {
            maskedData = data.slice(dataIndex, dataIndex + 4);
            dataIndex += 4;
        }

        // 数据量最大位10kb
        if(dataLength > 10240) {
            this.send("warning : data limit 10kb");
        } else {
            // dataIndex 位数据位的起始位置, datalength 位数据长度, maskedData 为二进制的解密数据
            this.stat = {
                index: dataIndex,
                totalLength: dataLength,
                length: dataLength,
                maskedData: maskedData,
                opcode: parseInt(data[0].toString(16).split("")[1], 16),
            }
        }
}

// 创建数据帧
function createDate(data) {
    let dataType = Buffer.isBuffer(data); // 判断数据是否是buffer类型
    let dataBuf, // 需要发送的二进制数据
        dataLength, // 数据真实长度
        dataIndex = 2; // 数据的起始位置
    let frame; // 用来存储封装好的数据帧

    if(dataType) {
        dataBuf = data;
    } else {
        dataBuf = Buffer.from(data);
    }

    dataLength = dataBuf.byteLength;

    // 计算payload data在frame中的起始位置
    dataIndex = dataIndex + (dataLength > 65535 ? 8 : (dataLength > 125 ? 2 : 0));
    // 创建多大空间
    frame = new Buffer.alloc(dataIndex + dataLength);

    // 第一个字节, fin = 1, opcode = 1
    frame[0] = parseInt(10000001, 2);

    // 长度超过65535 的由8个字节表示
    if(dataLength > 65535) {
        frame[1] = 127; // 第二个字节
        frame.writeUInt32BE(0, 2); // (值, 写入之前要跳过的位置)
        frame.writeUInt32BE(dataLength, 6);
    } else if (dataLength > 125) {
        frame[1] = 126;
        frame.writeUInt16BE(dataLength, 2);
    } else {
        frame[1] = dataLength;
    }

    // 发送给您客户端的数据
    frame.write(dataBuf.toString(), dataIndex); // (数据, 数据写入到buffer的位置)

    // this.write() == socket.write()  发送数据给客户端
    this.write(frame);

}


// 启动服务器
server.listen(port, ()=> {
    let dateTime = 
        (new Date()).getFullYear() +
        "-" + ((new Date()).getMonth() + 1) +
        "-" + (new Date()).getDate() +
        " " + (new Date()).getHours() +
        ":" + (new Date()).getMinutes() +
        ":" + (new Date()).getSeconds();
    console.log(`${dateTime} server start success: 127.0.0.1:${port}`)
})

参考

websocket与和他http的区别

基于node实现websocket协议

使用nodeJS在HTTP上实现WebSocket

如何让我的服务器返回正确的Sec-WebSocket-Accept标头值

学习WebSocket协议—从顶层到底层的实现原理

websocket 协议帧 解析

nodejs实现Websocket的数据接收发送

上一篇下一篇

猜你喜欢

热点阅读