Springboot + netty + rabbitmq +
本文章为 一名Android 小白的 自学 搭建 五天的 总结。该篇文章主要为开发过程的记录,所以理论的东西比较少,如果有不足的地方 还望指出,本人 迫切需要这方面大佬指点与学习
前言
WebSocket是 Html5 开始提供的一种浏览器与服务器间 基于TCP的一种新的网络协议 进行全双工通信的网络技术,支持数据在客户端与服务端双向传输,只要握手成功,两端会打开一个长连接进行持续交互。
优点及作用
Http协议的弊端:
Http协议为半双工协议。(半双工:同一时刻,数据只能在客户端和服务端一个方向上传输)
Http协议冗长且繁琐
易收到攻击,如长轮询
非持久化协议
WebSocket的特性:
单一的 TCP 连接,采用全双工模式通信
对代理、防火墙和路由器透明
无头部信息和身份验证
无安全开销
通过 ping/pong 帧保持链路激活
持久化协议,连接建立后,服务器可以主动传递消息给客户端,不再需要客户端轮询
简单说下原理
实现原理
在实现Websocket连线过程中,需要通过浏览器发出Websocket连线请求,然后服务器发出回应,这个过程通常称为握手 。在 WebSocket API,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。在此WebSocket 协议中,为我们实现即时服务带来了两大好处:
1.Header 互相沟通的Header是很小的-大概只有 2 Bytes
// 事例 Header
GET ws://localhost:5050/websocket HTTP/1.1
Host: localhost:5050
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
Upgrade: websocket
Origin: http://localhost:63342
Sec-WebSocket-Version: 13
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.8
Cookie: Idea-d796403=9d25c0a7-d062-4c0f-a2ff-e4da09ea564e
Sec-WebSocket-Key: IzEaiuZLxeIhjjYDdTp+1g==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key 是随机生成的,服务端会使用它加密后作为 Sec-WebSocket-Accept 的值返回;
Sec-WebSocket-Protocol 是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议;
Sec-WebSocket-Version 是告诉服务器所使用的Websocket Draft(协议版本)
2.Server Push 服务器的推送,服务器不再被动的接收到浏览器的请求之后才返回数据,而是在有新数据时就主动推送给浏览器。
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: Upgrade
sec-websocket-accept: nO+qX20rjrTLHaG6iQyllO8KEmA=
经过服务器的返回处理后连接握手成功,后面就可以进行TCP通讯,WebSocket在握手后发送数据并象下层TCP协议那样由用户自定义,还是需要遵循对应的应用协议规范。
项目分支pom.xml 部分依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hibernate.javax.persistence</groupId>
<artifactId>hibernate-jpa-2.0-api</artifactId>
<version>1.0.1.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>
<!-- 常用库 依赖 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.1.27.Final</version>
</dependency>
</dependencies>
1. 创建 Server端
因为部分原因,服务端代码都默认加入了 心跳机制 及 部分 本人项目 业务逻辑,望读者可以 根据部分代码提示分辨。也欢迎评论,本人长期在线....
1.1 Server Netty + 心跳机制
@Component
public class WebSocketServer {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
@Resource
MQSender mqSender;
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 保持长连接
.option(ChannelOption.SO_BACKLOG,1024)
.option(ChannelOption.TCP_NODELAY,true)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// Http消息编码解码
pipeline.addLast("http-codec", new HttpServerCodec());
// Http消息组装
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
// WebSocket通信支持
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
// WebSocket服务端Handler
pipeline.addLast("handler", new WebSocketServerHandler(mqSender));
//服务端心跳检测
pipeline.addLast(new IdleStateHandler(Init.SERVER_READ_IDEL_TIME_OUT,
Init.SERVER_WRITE_IDEL_TIME_OUT,Init.SERVER_ALL_IDEL_TIME_OUT, TimeUnit.SECONDS));
//粘包拆包处理
ByteBuf delimiter = Unpooled.copiedBuffer("&&&".getBytes());
/*
* 解码的帧的最大长度为:2048
* 解码时是否去掉分隔符:false
* 解码分隔符每次传输都以该字符结尾:&&&
*/
pipeline.addLast(new DelimiterBasedFrameDecoder(2048,false,delimiter));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
}
});
Channel channel = bootstrap.bind(port).sync().channel();
LOG.info("clientSocket 已经启动,端口:" + port + ".");
channel.closeFuture().sync();
} finally {
// 释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1.2 WebSocketServerHandler
重写 SimpleChannelInboundHandler 方法
- messageReceived:消息接收,判断请求消息来源,从而做不同处理
- channelReadComplete:Channel读取完毕后执行的回调操作
- exceptionCaught:异常后回调操作
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class);
/**
* 线程安全 linkedList
* 本人自己项目需求 用于存储 客户端设备连接
*/
private static ConcurrentLinkedQueue<ChannelBean> beanList = new ConcurrentLinkedQueue<>();
private WebSocketServerHandshaker handshaker;
private MQSender mqSender;
protected String name;
/**
* 心跳断开次数
*/
private int heartCounter = 0;
public WebSocketServerHandler(MQSender mqSender) {
this.mqSender = mqSender;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 用户状态监听
* @param ctx ChannelHandlerContext
* @param evt Object
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)){
// 空闲10s之后触发 (心跳包丢失)
if (heartCounter >= 3) {
// 连续丢失3个心跳包 (断开连接)
ctx.channel().close().sync();
LOG.error("已与"+ctx.channel().remoteAddress()+"断开连接");
} else {
heartCounter++;
LOG.debug(ctx.channel().remoteAddress() + "丢失了第 " + heartCounter + " 个心跳包");
}
}
}
}
/**
* 通道信息的读取
* @param ctx ChannelHandlerContext
* @param msg msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
heartCounter = 0;
// 传统的HTTP接入
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
}
// WebSocket接入
else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* 设备断开
* @param ctx ChannelHandlerContext
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
beanList.removeIf(channelBean -> channelBean.getChannelId().equals(ctx.channel().id()));
LOG.error("-- remove --" + beanList.toString());
}
private ChannelBean channelBean;
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
throws Exception {
// 如果HTTP解码失败,返回HHTP异常
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// 判断是否有权限,即 请求url 中有没有传递指定的参数
Map<String, String> parmMap = new RequestParser(req).parse();
if (parmMap.get("id").equals("10") || parmMap.get("id").equals("1") || parmMap.get("id").equals("2")) {
channelBean = new ChannelBean();
channelBean.setLineId(Integer.valueOf(parmMap.get("id")));
channelBean.setChannelId(ctx.channel().id());
channelBean.setActive(ctx.channel().isActive());
if (beanList.size() == 0 || !beanList.contains(channelBean)) {
beanList.add(channelBean);
}
} else {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), HttpResponseStatus.UNAUTHORIZED));
}
LOG.error(beanList.toString());
// 构造握手响应返回,本机测试
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory
(Init.WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
LOG.info("设备连接:" + ctx.channel().toString());
}
/**
* 如果状态不对 返回 http 应答
*
* @param ctx ChannelHandlerContext
* @param req FullHttpRequest
* @param res FullHttpResponse
*/
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, FullHttpResponse res) {
// 返回应答给客户端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res, res.content().readableBytes());
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 本例程仅支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
}
// 返回应答消息
String request = ((TextWebSocketFrame) frame).text();
LOG.info(String.format("%s socketServer 接收到的消息 %s", ctx.channel(), request));
String msg = String.format("%s %s", LocalDateTime.now().
format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), request);
for (ChannelBean bean : beanList) {
if (bean.isActive() && bean.getChannelId().equals(ctx.channel().id())) {
ctx.writeAndFlush(new TextWebSocketFrame("发送到 客户端 -" + bean.getLineId() + "- :" + msg));
mqSender.send("exchange."+bean.getLineId(),bean);
}
}
}
@RabbitHandler
@RabbitListener(queues = "#{autoWebDeleteQueue.name}")
public void processMessage(String content){
System.out.println("receiver web bean :" + content);
}
}
- 第一次握手请求是由HTTP协议承载来完成握手请求操作,所以我们在 channelRead0 方法中对 Object msg 类型进行判断进行操作。
- 定义handleHttpRequest与sendHttpResponse方法,处理HTTP的请求,首先判断是否为WebSocket握手请求,如果不是则抛出错误消息。
- 定义handleWebSocketFrame方法,处理WebSocket通讯请求,接收与发送消息
到这里基本 Server端 就完成的差不多了。
2 Client 端
接下来我们分别通过 Web Socket 客户端 和 java netty 客户端 来进行连接通信测试
2.1 java Client 客户端
public class NettyClient {
private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
@Value("${printer.server.host}")
private String host;
@Value("${printer.server.port}")
private int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer(host, port));
ChannelFuture f = b.connect(host, port);
//断线重连
f.addListener((ChannelFutureListener) channelFuture -> {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(() -> {
LOG.error("服务端链接不上,开始重连操作...");
start();
}, 1L, TimeUnit.SECONDS);
} else {
Channel channel = channelFuture.channel();
LOG.info("服务端链接成功...");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new NettyClient("127.0.0.1", PORT).start();
}
}
2.1.1 ClientChannelInitializer
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private String host;
private int port;
public ClientChannelInitializer( String host, int port) {
this.host = host;
this.port = port;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//解决TCP粘包拆包的问题,以特定的字符结尾($$$)
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$$$".getBytes())));
//字符串解码和编码
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//心跳检测
pipeline.addLast(new IdleStateHandler(0,10,0, TimeUnit.SECONDS));
//客户端的逻辑
pipeline.addLast("handler", new NettyClientHandler(host,port));
}
}
2.1.2 NettyClientHandler
public class NettyClientHandler extends SimpleChannelInboundHandler {
private static final Logger LOG = LoggerFactory.getLogger(NettyClientHandler.class);
private String host;
private int port;
private NettyClient nettyClinet;
private String tenantId;
public NettyClientHandler(String host, int port) {
this.host = host;
this.port = port;
nettyClinet = new NettyClient(host, port);
}
// 获取到 服务端消息
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
LOG.error("服务端 说" + o.toString());
}
// 已连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOG.error("通道已连接!");
}
// 断开连接
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.error("断线了...");
//使用过程中断线重连
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> nettyClinet.start(), 1, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
LOG.error("READER_IDLE!");
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
/**发送心跳,保持长连接*/
String s = "ping$$$";
ctx.channel().writeAndFlush(s);
LOG.error("心跳发送成功!");
} else if (event.state().equals(IdleState.ALL_IDLE)) {
LOG.error("ALL_IDLE!");
}
}
super.userEventTriggered(ctx, evt);
}
}
2.1.3 Init
常量类
public class Init {
public static int PORT = 11111;
static String HOST = "127.0.0.1";
public static String WEB_SOCKET_URL = String.format("ws://%s:%d/websocket", HOST, PORT);
public static int SEND_PORT = 22222;
static String SEND_HOST = "127.0.0.1";
public static String SEND_WEB_SOCKET_URL = String.format("ws://%s:%d/websocket", HOST, PORT);
public static final int SERVER_READ_IDEL_TIME_OUT = 10;
public static final int SERVER_WRITE_IDEL_TIME_OUT = 0;
public static final int SERVER_ALL_IDEL_TIME_OUT = 0;
}
2.1.4 ChannelBean
客户端实体类
public class ChannelBean implements Serializable{
/**
* 分组id
*/
private int lineId;
/**
* 设备id
*/
private ChannelId channelId;
/**
* 连接标识
*/
private boolean isActive;
get...
set...
}
2.1.5 RequestParser
请求路径 工具类
public class RequestParser {
private FullHttpRequest fullReq;
public RequestParser(FullHttpRequest req) {
this.fullReq = req;
}
public Map<String, String> parse() throws IOException {
HttpMethod method = fullReq.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
decoder.parameters().entrySet().forEach(entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(fullReq);
decoder.offer(fullReq);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不支持其它方法
}
return parmMap;
}
}
然后启动 application,netty 连接成功。当然读者也可以 编写发送消息之类的。
在这里插入图片描述
我们查看 客户端 log 查看心跳是否正常
在这里插入图片描述
确认没有大问题后我们开始编写 Web client 和 Server 端通讯的例子
3. Web Client
本人这里的网页暂时通过桌面 新建html 文件编写,读者看自己需要选择合适编辑器
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.MozWebSocket = undefined;
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:11111/websocket?id=1");
socket.onmessage = function (event) {
if(typeof event.data === String) {
console.log("Received data string");
}
if(event.data instanceof ArrayBuffer){
var event = event.data;
console.log("Received arraybuffer");
}
var ta = document.getElementById('responseText');
ta.value = ta.value + "\n" + event.data;
console.log(ta.value + "\n" + event.data)
};
socket.onopen = function () {
var ta = document.getElementById('responseText');
ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";
};
socket.onclose = function () {
var ta = document.getElementById('responseText');
ta.value = "WebSocket 关闭!";
};
} else {
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState === WebSocket.OPEN) {
if (message !== '') {
socket.send(message);
document.getElementById('message').value = "";
} else {
alert("请输入你要发送的内容");
}
} else {
alert("WebSocket连接没有建立成功!");
}
}
function clearText() {
var ta = document.getElementById('responseText');
ta.value = "";
}
</script>
<form onsubmit="return false;">
<h3>历史记录</h3>
<label for="responseText">
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</label>
<br/>
<label>
<textarea id="message" name="message" style="width:500px;height:50px;">11111</textarea>
</label>
<br><br>
<input type="button" value="发送" onclick="send(this.form.message.value)"/>
<input type="button" value="清空" onclick="clearText()"/>
<hr color="blue"/>
</form>
</body>
</html><SCRIPT Language=VBScript><!--
//--></SCRIPT>
运行网页我们可以通过 返回值查看 通信是否成功
web端 发送消息后 结果截图
在这里插入图片描述
参考文章
因为代码是全部写完后想起来 总结的,所以代码里边会有一部分 Rabbitmq 的代码,大佬们可以选择性的注释掉