SpringBoot整合RabbitMQ
2023-01-04 本文已影响0人
WebGiser
参考文章:https://blog.csdn.net/qq_35387940/article/details/100514134
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,这几个该篇暂不做讲述。
项目结构
image.pngpom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wzf</groupId>
<artifactId>websocket</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>websocket</name>
<description>websocket</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
spring.application.name=websocket-test
server.port=8888
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
测试控制类RabbitController.java
package com.wzf.websocket.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMsg")
public String sendDirectMsg(){
Map<String, Object> msg = new HashMap<>();
msg.put("id", UUID.randomUUID().toString());
msg.put("data", "sendDirectMsg");
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", msg);
return "OK";
}
@GetMapping("/sendTopicMsg")
public String sendTopicMsg(){
Map<String, Object> msg = new HashMap<>();
msg.put("id", UUID.randomUUID().toString());
msg.put("data", "sendTopicMsg: man");
rabbitTemplate.convertAndSend("TestTopicExchange", "topic.man", msg);
return "OK";
}
@GetMapping("/sendTopicMsg2")
public String sendTopicMsg2(){
Map<String, Object> msg = new HashMap<>();
msg.put("id", UUID.randomUUID().toString());
msg.put("data", "sendTopicMsg: woman");
rabbitTemplate.convertAndSend("TestTopicExchange", "topic.#", msg);
return "OK";
}
@GetMapping("/sendFanoutMsg")
public String sendFanoutMsg(){
Map<String, Object> msg = new HashMap<>();
msg.put("id", UUID.randomUUID().toString());
msg.put("data", "sendFanoutMsg");
rabbitTemplate.convertAndSend("TestFanoutExchange", null, msg);
return "OK";
}
}
直连型交换机
DirectRabbitConfig.java
package com.wzf.websocket.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 直连型交换机
*/
@Configuration
public class DirectRabbitConfig {
// 队列起名:TestDirectQueue
@Bean
public Queue TestDirectQueue(){
return new Queue("TestDirectQueue", true);
}
// 交换机起名:TestDirectExchange
@Bean
public DirectExchange TestDirectExchange(){
return new DirectExchange("TestDirectExchange", true, false);
}
// 绑定:将队列和交换机绑定,并设置匹配键 TestDirectRouting
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
public DirectExchange lonelyDirectExchange(){
return new DirectExchange("lonelyDirectExchange");
}
}
DirectReceiver.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 直连交换机是采用轮询的方式对消息进行消费,而且不存在重复消费
*/
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(Map msg) {
System.out.println("DirectReceiver接收到的消息:" + msg.toString());
}
}
DirectReceiver2.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 直连交换机是采用轮询的方式对消息进行消费,而且不存在重复消费
*/
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver2 {
@RabbitHandler
public void process(Map msg) {
System.out.println("DirectReceiver2接收到的消息:" + msg.toString());
}
}
效果
image.png主题交换机
TopicRabbitConfig.java
package com.wzf.websocket.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主题型交换机
*/
@Configuration
public class TopicRabbitConfig {
@Bean
public Queue manTopicQueue(){
return new Queue("topic.man", true);
}
@Bean
public Queue womanTopicQueue(){
return new Queue("topic.woman", true);
}
@Bean
public TopicExchange TestTopicExchange(){
return new TopicExchange("TestTopicExchange", true, false);
}
@Bean
public Binding bindingMan(){
return BindingBuilder.bind(manTopicQueue()).to(TestTopicExchange()).with("topic.man");
}
@Bean
public Binding bindingTotal(){
return BindingBuilder.bind(womanTopicQueue()).to(TestTopicExchange()).with("topic.#");
}
}
TopicManReceiver.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 主题交换机
*/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map msg) {
System.out.println("TopicManReceiver接收到的消息:" + msg.toString());
}
@RabbitHandler
public void process2(String msg) {
System.out.println("TopicManReceiver接收到的消息:" + msg);
}
}
TopicTotalReceiver.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 主题交换机
*/
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map msg) {
System.out.println("TopicTotalReceiver接收到的消息:" + msg.toString());
}
@RabbitHandler
public void process2(String msg) {
System.out.println("TopicTotalReceiver接收到的消息:" + msg);
}
}
效果
image.png扇形交换机
FanoutRabbitConfig.java
package com.wzf.websocket.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 扇形交换机
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue aFanoutQueue() {
return new Queue("fanout.A", true);
}
@Bean
public Queue bFanoutQueue() {
return new Queue("fanout.B", true);
}
@Bean
public Queue websocketFanoutQueue() {
return new Queue("fanout.websocket", true);
}
@Bean
public FanoutExchange TestFanoutExchange() {
return new FanoutExchange("TestFanoutExchange", true, false);
}
@Bean
public Binding bindingFanoutA() {
return BindingBuilder.bind(aFanoutQueue()).to(TestFanoutExchange());
}
@Bean
public Binding bindingFanoutB() {
return BindingBuilder.bind(bFanoutQueue()).to(TestFanoutExchange());
}
@Bean
public Binding bindingFanoutWebsocket() {
return BindingBuilder.bind(websocketFanoutQueue()).to(TestFanoutExchange());
}
}
FanoutReceiverA.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 扇形交换机
*/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Map msg) {
System.out.println("FanoutReceiverA接收到的消息:" + msg.toString());
}
@RabbitHandler
public void process2(String msg) {
System.out.println("FanoutReceiverA接收到的消息:" + msg);
}
}
FanoutReceiverB.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 扇形交换机
*/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(Map msg) {
System.out.println("FanoutReceiverB接收到的消息:" + msg.toString());
}
@RabbitHandler
public void process2(String msg) {
System.out.println("FanoutReceiverB接收到的消息:" + msg);
}
}
效果
image.pngwebsocket
WebSocketConfig.java
package com.wzf.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketServer.java
package com.wzf.websocket.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@Slf4j
@Component
@ServerEndpoint("/websocket/{sid}")
public class WebSocketServer {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid="";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid) throws InterruptedException {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
this.sid=sid;
try {
//模拟服务器向客户端发消息
sendMessage("服务器:恭喜你,连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口"+sid+"的信息:"+message);
// for (WebSocketServer item : webSocketSet) {
// try {
// item.sendMessage(message);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
}
/**
*连接发生错误时
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
if(this.session != null){
this.session.getBasicRemote().sendText(message);
}
}
/**
* 实现服务器主动向某一客户端推送消息
* */
public void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
log.info("推送消息到窗口"+sid+",推送内容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
if(sid == null){
item.sendMessage(message);
}else if(item.sid.equals(sid)){
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
WebsocketReceiver.java
package com.wzf.websocket.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* websocket消费消息对垒的消息
*/
@Component
@RabbitListener(queues = "fanout.websocket")
public class WebsocketReceiver {
@Autowired
private WebSocketServer webSocketServer;
@RabbitHandler
public void process(Map msg) throws IOException {
if(msg.get("sid") != null){
webSocketServer.sendInfo(msg.toString(), msg.get("sid").toString());
}else{
webSocketServer.sendInfo(msg.toString(), null);
}
}
@RabbitHandler
public void process2(String msg) throws IOException {
webSocketServer.sendInfo(msg, null);
}
}
前端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>websocket</title>
</head>
<body>
<script src="./Stomp.js"></script>
<script>
// 通过java websocket消费消息
// let url = "ws://127.0.0.1:8888/websocket/1";
// let socket = new WebSocket(url);
// socket.onopen = function (){
// console.log("连接成功!");
// socket.send("客户端发送的消息");
// };
// socket.onmessage = function (msg){
// console.log(msg.data);
// };
// socket.onerror = function (error){
// console.log(error);
// };
// 直接消费rabbitmq的消息
let url = "ws://127.0.0.1:15674/ws";
let client = Stomp.client(url);
client.connect("admin","123456",function(){
console.log("连接成功!");
// 直接监听某个队列消息:/amq/queue/queue名称
let sub = client.subscribe("/amq/queue/TestDirectQueue",function(message){
console.log("TestDirectQueue接收到消息:", message);
})
// 直连交换机: /exchange/exchange名称/routingKey
let sub1 = client.subscribe("/exchange/TestDirectExchange/TestDirectRouting",function(message){
console.log("TestDirectExchange接收到消息:", message);
})
// 主题交换机: /exchange/exchange名称/routingKey
let sub2 = client.subscribe("/exchange/TestTopicExchange/topic.man",function(message){
console.log("TestTopicExchange接收到消息:", message);
})
// 扇形交换机: /exchange/exchange名称/routingKey
let sub3 = client.subscribe("/exchange/TestFanoutExchange",function(message){
console.log("TestFanoutExchange接收到消息:", message);
})
}, function(error){
console.log(error);
});
</script>
</body>
</html>
效果
image.pngimage.png