WebSocket 的实现
网络协议的
长连接: 一个链接上可以连续发送多个数据包,在链接期间,如果没有数据包发送,需要双方发链路检查包
TCP/IP: TCP/IP 属于传输层,主要解决网络中的数据传输问题,只管传输数据。但这样对传输的数据没有一个规范的封装、解析等处理。使得传输的数据难以识别,所以才有了应用层协议对数据进行的封装、解析等,如http协议。
HTTP: HTTP协议是应用层协议,用于分装解析传输数据。 从HTTP1.1开始其实就默认开启了长链接,也就是请求头header中可以看到Connection:Keep-alive。但是长连接只是说保持了(服务器可以告诉客户端保持时间Keep-Alive:timeout=20;max=20;)这个TCP通道,并采用服务器和客户端应答模式(Request-Response),不需要再创建一个链接通道,做到一个性能优化。
socket: 与HTTP协议不一样,socket不是协议,他是在程序层面上对传输层协议(像TCP/IP)的接口封装。我们知道传输层的协议,是解决数据在网络中传输的问题的,那么socket(套接字)就是传输通道两端的接口。
Websocket: WebSocket是包装成了一个应用层协议作为socket,从而能够让客户端和远程服务端通过web建立全双工通信。
WebSocket API (客户端)
WebSocket API 是HTML5 推出的东西。在客户端我们可以通过HTML5 所提供的API 对websocket 进行创建、发送数据、监听信息、监听报错等功能(HTML5 WebSocket)
if("WebSocket" in window) {
//创建WebSocket实例,可以使用ws和wss。第二个参数可以选填自定义协议,如果多协议,可以以数组方式
let ws = new WebSocket("ws://127.0.0.1:8091");
// 用于指定连接成功后的回调函数。
ws.onopen = (event) => {
console.log("WebSocket 链接成功");
}
// 用于指定当从服务器接受到信息时的回调函数。
ws.onmessage = (data) => {
console.log(data);
}
// 用于指定连接关闭后的回调函数。
ws.onerror = (e) => {
console.error("WebSocket error observed:", e);
}
//....
else {
alert("该浏览器不支持 WebSocket")
}
WebSocket (Node 服务端)
我们知道WebSocket 是在Socket的基础上实现的,所以我们要做的是对现有的Socket协议进行升级。
步骤: 客户端发送websocket请求-->服务端接受并识别该请求-->对该请求协议进行升级--> 返回给客户端 --> websocket 通道建立 --> 客户端/服务端发送数据
协议升级
// 创建websocket
const http = require("http");
const crypto = require("crypto");
const MAGIC_STRINC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // 固定值
const port = 8091;
let server = http.createServer();
// http服务器部分
server.on("request", (req, res) => {
res.end("websocket test");
})
// upgrade 请求(升级通讯协议)
server.on("upgrade", (req, socket, head) => {
// 加密 Sec-Websocket-Accept 值
const swa = crypto.createHash('sha1').update(req.headers['sec-websocket-key'] + MAGIC_STRINC).digest('base64');
// 构造响应头部
let resHeaders = ([
'HTTP/1.1 101 Switching Protocols', //必需。响应头。状态码为101。任何非101的响应都为握手未完成。但是HTTP语义是存在的。
'Upgrade: websocket', // 必需。升级类型。
'Connection: Upgrade', //必需。本次连接类型为升级。
`Sec-Websocket-Accept: ${swa}` //必需。表明服务器是否愿意接受连接。如果接受,值就必须是通过上面算法得到的值。
]).concat('','').join('\r\n');
// 返回升级协议信息 完成WebSocket通道建立
socket.write(resHeaders);
})
// 启动服务器
server.listen(port, ()=> {
let dateTime =
(new Date()).getFullYear() +
"-" + ((new Date()).getMonth() + 1) +
"-" + (new Date()).getDate() +
" " + (new Date()).getHours() +
":" + (new Date()).getMinutes() +
":" + (new Date()).getSeconds();
console.log(`${dateTime} server start success: 127.0.0.1:${port}`)
})
在这里需要注意的是头部信息和头部信息中的Sec-Websocket-Accept的值。
该值需要是一个通过base64加密的哈希值(sha1)。 而该加密所用的数据是客户端传过来的sec-websocket-key的值和MAGIC_STRINC内的固定值。 对MAGIC_STRINC的说明
WebSocket 数据传输
Webscoket 中传输的数据是数据帧(frame)
数据帧有多种类型 主要有:文本型、二进制数据
数据帧结构
每一列代表一个字节,一个字节8位,每一位又代表一个二进制数。
- fin: 标识这一帧数据是否是该分块的最后一帧。
1 为最后一帧
0 不是最后一帧。需要分为多个帧传输
- RSV1、RSV2、RSV3
扩展字段,除非一个扩展经过协商赋予了非零值的某种含义,否则必须为0
- opcode
解释 payload data 的类型,如果收到识别不了的opcode,直接断开。
分类值如下:
%x0:连续的帧
%x1:text帧
%x2:binary帧
%x3 - 7:为非控制帧而预留的
%x8:关闭握手帧
%x9:ping帧
%xA:pong帧
%xB - F:为非控制帧而预留的
- masked: 占第二个字节的一位,定义了masking-key是否存在。并且使用masking-key掩码解析Payload data。
1 客户端发送数据到服务端
0 服务端发送数据到客户端
- payload length: 表示Payload data的总长度。占7位,或者7+2个字节、或者7+8个字节。
0-125,则是payload的真实长度
126,则后面2个字节形成的16位无符号整型数的值是payload的真实长度,125<数据长度<65535
127,则后面8个字节形成的64位无符号整型数的值是payload的真实长度,数据长度>65535
- masking key: 0或4字节,当masked为1的时候才存在,为4个字节,否则为0,用于对我们需要的数据进行解密
- payload data: 我们需要的数据,如果masked为1,该数据会被加密,要通过masking key进行异或运算解密才能获取到真实数据。
创建数据帧
// 创建数据帧
function createDate(data) {
let dataType = Buffer.isBuffer(data); // 判断数据是否是buffer类型
let dataBuf, // 需要发送的二进制数据
dataLength, // 数据真实长度
dataIndex = 2; // 数据的起始位置
let frame; // 用来存储封装好的数据帧
if(dataType) {
dataBuf = data;
} else {
dataBuf = Buffer.from(data);
}
dataLength = dataBuf.byteLength;
// 计算payload data在frame中的起始位置
dataIndex = dataIndex + (dataLength > 65535 ? 8 : (dataLength > 125 ? 2 : 0));
// 创建多大空间
frame = new Buffer.alloc(dataIndex + dataLength);
// 第一个字节, fin = 1, opcode = 1
frame[0] = parseInt(10000001, 2);
// 长度超过65535 的由8个字节表示
if(dataLength > 65535) {
frame[1] = 127; // 第二个字节
frame.writeUInt32BE(0, 2); // (值, 写入之前要跳过的位置)
frame.writeUInt32BE(dataLength, 6);
} else if (dataLength > 125) {
frame[1] = 126;
frame.writeUInt16BE(dataLength, 2);
} else {
frame[1] = dataLength;
}
// 发送给您客户端的数据
frame.write(dataBuf.toString(), dataIndex); // (数据, 数据写入到buffer的位置)
// this.write() == socket.write() 发送数据给客户端
this.write(frame);
}
解数据帧
// 获取客户端数据状态
function getHandleDateState(data) {
let dataIndex = 2; // 数据索引 应为第一个字节和第二个字节肯定不是数据,所以数据从初始值2开始
let secondeByte = data[1]; // 代表masked位和可能是payloadLength位的第二个字节
let hasMask = secondeByte >= 128; // 如果大于大于或等一128, 说明masked为1
let dataLength, maskedData;
// 如果数据为126, 则后面16位长度为数据位,如果为127 则后面64位长度的数据为数据长度
if(secondeByte == 126) {
dataIndex += 2;
dataLength = data.readUInt16BE(2);
} else if(secondeByte == 127) {
dataIndex += 8;
dataLength = data.readUInt32BE(2) + data.readUInt32BE(6);
} else {
dataLength = secondeByte;
}
// 如果有掩码, 则获取32位的二进制masking key, 同时更新index
if(hasMask) {
maskedData = data.slice(dataIndex, dataIndex + 4);
dataIndex += 4;
}
// 数据量最大位10kb
if(dataLength > 10240) {
this.send("warning : data limit 10kb");
} else {
// dataIndex 位数据位的起始位置, datalength 位数据长度, maskedData 为二进制的解密数据
this.stat = {
index: dataIndex,
totalLength: dataLength,
length: dataLength,
maskedData: maskedData,
opcode: parseInt(data[0].toString(16).split("")[1], 16),
}
}
}
// 解数据帧
function decode(data, key) {
getHandleDateState.apply(this, [data]);
let stat;
this.datas;
if(!(stat = this.stat)) return;
// 如果opcode为9, 则发送pong响应, 如果opcode为10则置pingtimes 为0
if(stat.opcode === 9 || stat === 10) {
(stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0);
this.reset();
return;
}
else if(stat.opcode === 8) {
console.log(key)
socketMap.delete(key);
this.end();
return;
}
let result;
if (stat.maskedData) {
result = Buffer.alloc(data.length-stat.index);
for (var i = stat.index, j = 0; i < data.length; i++, j++) {
//对每个字节进行异或运算,masked是4个字节,所以%4,借此循环
result[j] = data[i] ^ stat.maskedData[j % 4];
}
} else {
result = data.slice(stat.index, data.length);
}
this.datasd = result;
stat.length -= (data.length - stat.index);
//当长度为0,说明当前帧为最后帧
if (stat.length == 0) {
var buf = Buffer.concat(this.datas, stat.totalLength);
console.log(stat.opcode)
if (stat.opcode == 8) {
this.close(buf.toString());
} else {
this.emit("message", buf.toString());
}
this.reset();
}
// 打印客户端信息
console.log(this.datasd.toString())
}
心跳检查
由于websocket 不进行交互会关闭通道所以,才有了心跳检查。
// 心跳检测
function checkHeartBeat (index) {
this.pingTimes = 0; // 记录心跳次数
let c = setInterval(() => {
if(this.pingTimes > 4) {
this.end();
socketMap.delete(index);
clearInterval(c);
} else {
sendPring.apply(this, []);
this.pingTimes++;
}
}, 5000);
}
// 发送心跳
function sendPring() {
this.write(Buffer.from(['0x89', '0x0']))
}
完整代码(服务端)
// 创建websocket
const http = require("http");
const crypto = require("crypto")
const port = 8091;
const MAGIC_STRINC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // 固定值
let socketMap = new Map();
let count = 0;
let server = http.createServer();
// http服务器部分
server.on("request", (req, res) => {
res.end("websocket test");
})
// upgrade 请求(升级通讯协议)
server.on("upgrade", (req, socket, head) => {
// 加密 Sec-Websocket-Accept 值
const swa = crypto.createHash('sha1').update(req.headers['sec-websocket-key'] + MAGIC_STRINC).digest('base64');
// 构造响应头部
let resHeaders = ([
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-Websocket-Accept: ${swa}`
]).concat('','').join('\r\n');
socket.write(resHeaders);
crateSocketMap(socket);
})
// socket Map
function crateSocketMap (socket) {
let index = count++;
socketMap.set(index, socket);
let number = parseInt(Math.random() * 10);
let c = setTimeout(() => {
if(socketMap.get(index)) {
createDate.call(socket, number.toString());
number++;
} else {
clearInterval(c)
}
}, 2000);
socket.on("data", (data) => {
decode.apply(socket, [data, index]);
})
socket.on("error", (err) => {
console.log(err)
})
checkHeartBeat.apply(socket, [index]);
}
// 心跳检测
function checkHeartBeat (index) {
this.pingTimes = 0; // 记录心跳次数
let c = setInterval(() => {
if(this.pingTimes > 4) {
this.end();
socketMap.delete(index);
clearInterval(c);
} else {
sendPring.apply(this, []);
this.pingTimes++;
}
}, 5000);
}
// 发送心跳
function sendPring() {
this.write(Buffer.from(['0x89', '0x0']))
}
// 解数据帧
function decode(data, key) {
getHandleDateState.apply(this, [data]);
let stat;
this.datas;
if(!(stat = this.stat)) return;
// 如果opcode为9, 则发送pong响应, 如果opcode为10则置pingtimes 为0
if(stat.opcode === 9 || stat === 10) {
(stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0);
this.reset();
return;
}
else if(stat.opcode === 8) {
console.log(key)
socketMap.delete(key);
this.end();
return;
}
let result;
if (stat.maskedData) {
result = Buffer.alloc(data.length-stat.index);
for (var i = stat.index, j = 0; i < data.length; i++, j++) {
//对每个字节进行异或运算,masked是4个字节,所以%4,借此循环
result[j] = data[i] ^ stat.maskedData[j % 4];
}
} else {
result = data.slice(stat.index, data.length);
}
this.datasd = result;
stat.length -= (data.length - stat.index);
//当长度为0,说明当前帧为最后帧
if (stat.length == 0) {
var buf = Buffer.concat(this.datas, stat.totalLength);
console.log(stat.opcode)
if (stat.opcode == 8) {
this.close(buf.toString());
} else {
this.emit("message", buf.toString());
}
this.reset();
}
// 打印客户端信息
console.log(this.datasd.toString())
}
// 获取客户端数据状态
function getHandleDateState(data) {
let dataIndex = 2; // 数据索引 应为第一个字节和第二个字节肯定不是数据,所以数据从初始值2开始
let secondeByte = data[1]; // 代表masked位和可能是payloadLength位的第二个字节
let hasMask = secondeByte >= 128; // 如果大于大于或等一128, 说明masked为1
let dataLength, maskedData;
// 如果数据为126, 则后面16位长度为数据位,如果为127 则后面64位长度的数据为数据长度
if(secondeByte == 126) {
dataIndex += 2;
dataLength = data.readUInt16BE(2);
} else if(secondeByte == 127) {
dataIndex += 8;
dataLength = data.readUInt32BE(2) + data.readUInt32BE(6);
} else {
dataLength = secondeByte;
}
// 如果有掩码, 则获取32位的二进制masking key, 同时更新index
if(hasMask) {
maskedData = data.slice(dataIndex, dataIndex + 4);
dataIndex += 4;
}
// 数据量最大位10kb
if(dataLength > 10240) {
this.send("warning : data limit 10kb");
} else {
// dataIndex 位数据位的起始位置, datalength 位数据长度, maskedData 为二进制的解密数据
this.stat = {
index: dataIndex,
totalLength: dataLength,
length: dataLength,
maskedData: maskedData,
opcode: parseInt(data[0].toString(16).split("")[1], 16),
}
}
}
// 创建数据帧
function createDate(data) {
let dataType = Buffer.isBuffer(data); // 判断数据是否是buffer类型
let dataBuf, // 需要发送的二进制数据
dataLength, // 数据真实长度
dataIndex = 2; // 数据的起始位置
let frame; // 用来存储封装好的数据帧
if(dataType) {
dataBuf = data;
} else {
dataBuf = Buffer.from(data);
}
dataLength = dataBuf.byteLength;
// 计算payload data在frame中的起始位置
dataIndex = dataIndex + (dataLength > 65535 ? 8 : (dataLength > 125 ? 2 : 0));
// 创建多大空间
frame = new Buffer.alloc(dataIndex + dataLength);
// 第一个字节, fin = 1, opcode = 1
frame[0] = parseInt(10000001, 2);
// 长度超过65535 的由8个字节表示
if(dataLength > 65535) {
frame[1] = 127; // 第二个字节
frame.writeUInt32BE(0, 2); // (值, 写入之前要跳过的位置)
frame.writeUInt32BE(dataLength, 6);
} else if (dataLength > 125) {
frame[1] = 126;
frame.writeUInt16BE(dataLength, 2);
} else {
frame[1] = dataLength;
}
// 发送给您客户端的数据
frame.write(dataBuf.toString(), dataIndex); // (数据, 数据写入到buffer的位置)
// this.write() == socket.write() 发送数据给客户端
this.write(frame);
}
// 启动服务器
server.listen(port, ()=> {
let dateTime =
(new Date()).getFullYear() +
"-" + ((new Date()).getMonth() + 1) +
"-" + (new Date()).getDate() +
" " + (new Date()).getHours() +
":" + (new Date()).getMinutes() +
":" + (new Date()).getSeconds();
console.log(`${dateTime} server start success: 127.0.0.1:${port}`)
})