RabbitMQ总结
2019-01-15 本文已影响0人
wingedsnake
这里只做个人技术笔录,不搞一堆的废话
官网中提示:RabbitMQ实现了多种协议。此处测试使用AMQP 0-9-1
协议。使用Java语言(通用Java代码,非Spring AMQP)
RabbitMQ官方文档中介绍了六种工作模式:
- 点对点模式
- 工作队列模式(Work queues)
- 发布/订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 主题模式(Topics)
- 远程过程调用模式(RPC)
接下来对各个工作模式做对应的测试及简单的相关理论记录。
假设已开启本地RabbitMQ服务,且端口为默认端口5672
。
点对点模式
即最简单的单项点对点生产者消费者模式。
点对点模式模型(引用自官网)
队列,即消息缓冲器。RabbitMQ中传输的消息数据存放在队列中
消费者,即消息接收端
消息点对点模式模型图
程序测试流程:
发布者连接到RabbitMQ服务后,声明队列并将消息存入队列中发送并退出。接收端连接RabbitMQ后通过通道从对应的队列中取出消息信息并打印。
发布者测试代码:
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
String message = "你好!我的第一个rabbitMQ 队列应用。";
try(Connection newConnection = cFactory.newConnection();
Channel createChannel = newConnection.createChannel()) {
//通过构建的通道声明一个队列
createChannel.queueDeclare(QUEUE_NAME, false, false, false, null);
//然后在通道上创建一个发布,将消息通过队列发布上去
createChannel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
接收端测试代码:
try {
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
Connection newConnection = cFactory.newConnection();
Channel channel = newConnection.createChannel();
String QUEUE_NAME = "hello";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//队列回传
DeliverCallback deliverCallBack = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
Envelope envelope = delivery.getEnvelope();
byte[] body = delivery.getBody();
BasicProperties properties = delivery.getProperties();
//
System.err.println("envelope-getDeliveryTag:" + envelope.getDeliveryTag());
System.err.println("envelope-getExchange:" + envelope.getExchange());
System.err.println("envelope-getRoutingKey:" + envelope.getRoutingKey());
//获取消息体
String message = new String(body,"UTF-8");
System.err.println("message: " + message);
}
};
String basicConsume = channel.basicConsume(QUEUE_NAME, true, deliverCallBack, CancelCallback -> {});
System.out.println(":-------basicConsume--------:" + basicConsume);
} catch (Exception e) {
e.printStackTrace();
}
工作队列模式(Work queues)
即选择分发。当接收端存在多个,且指定接收端正在处理未处理完的业务时分发给匹配的空闲接收端。
工作队列模式的模型(引用自官网)
程序测试流程:
发布者连接到RabbitMQ服务后,声明队列并将消息存入队列中发送并退出。启动多个接收端,连接RabbitMQ服务,通过通道获取对应的队列。当某个接收端在处理业务时队列中的消息分发给空闲的接收端,处理消息并打印消息。
发布者测试代码:
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
//通过命令行获取消息文本
String nextLine = scanner.nextLine();
//要发送的消息
String message = String.join(" ",nextLine);
//队列名称
String QUEUE_NAME = "QUEUE_NAME";
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
try(Connection newConnection = cFactory.newConnection();
Channel channel = newConnection.createChannel()){
//创建队列:可持久化的队列(durable=true)
channel.queueDelete("QUEUE_NAME");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e) {
e.printStackTrace();
}
}
接收端测试代码:
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class QueuesCustomer {
public static void main(String[] args) {
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
try{
Connection conn = cFactory.newConnection();
Channel channel = conn.createChannel();
//设置消费端每次获取数量为1
channel.basicQos(1);
//队列名称
String QUEUE_NAME ="QUEUE_NAME";
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//消息回调函数
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}catch(Exception e) {
e.printStackTrace();
}finally {
System.out.println("[x] Done");
//关闭自动确认信息,改为手动确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, CancelCallback -> {});
}catch(Exception e) {
e.printStackTrace();
}
}
protected static void doWork(String message) throws Exception {
for (char ch: message.toCharArray()) {
//模拟延时业务,让线程睡眠5秒
if (ch == '.') Thread.sleep(5000);
}
}
}
发布/订阅模式(Publish/Subscribe)
即发布的消息广播给所有接收者。接收者同时收到消息并各自处理。
发布/订阅模式的模型(引用自官网)
程序测试流程:
发布者连接到RabbitMQ服务后,声明交换器(exchange)将消息发布至交换器中。启动多个接收端,连接RabbitMQ服务,通过通道各自声明一个临时队列。然后绑定队列至交换器上,从临时队列中获取消息并打印。(
官方文档中写明:如果不声明交换器,则使用默认交换器。所以前俩种模式示例未声明交换器则是声明了默认交换器
)
发布端测试代码:
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Publisher {
public static void main(String[] args) {
//从键盘获取数据
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String nextLine = scanner.nextLine();
//创建连接工厂
ConnectionFactory cFactory = new ConnectionFactory();
//通过连接工厂创建连接(Cnnectoin)及通道(Channel)
try(Connection conn = cFactory.newConnection();
Channel channel = conn.createChannel()){
//创建交易所(exchange):通过管道创建
channel.exchangeDeclare("logs", "fanout");
String message = nextLine;
//将数据发布到交易所中
channel.basicPublish("logs","", null, message.getBytes());
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
订阅端测试代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe {
public static void main(String[] args) {
//创建工厂
ConnectionFactory cFactory = new ConnectionFactory();
//创建连接
//创建通道
try {
Connection connection = cFactory.newConnection();
Channel channel = connection.createChannel();
String queue = channel.queueDeclare().getQueue();
//如果有则订阅到交易所上,如果没有则创建一个
channel.exchangeDeclare("logs", "fanout");
//将队列绑定到交易所
channel.queueBind(queue, "logs", "");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.err.println(queue + ":---> " + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(queue,deliverCallback,CancelCallback->{});
}catch (Exception e) {
e.printStackTrace();
}
}
}
路由模式(Routing)
即发布端来指定分发的接收端。
路由模式的模型(引用自官网)
程序测试流程:
发布者连接到RabbitMQ服务后,声明交换器(exchange)将消息发布至交换器中并指定接收者。启动多个接收端,连接RabbitMQ服务,通过通道各自声明一个临时队列。然后绑定队列至交换器上,指定的接收者从临时队列中获取消息并打印。
发布端测试代码:
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 发布者:路由方式
* @author zhengjie
*/
public class PublicsherServerity {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
//从命令提示符中获取消息(消息格式: 日志级别:日志内容)
Scanner scanner = new Scanner(System.in);
//创建连接工厂
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
while(scanner.hasNext()) {
//键盘中获取的消息
String nextLine = scanner.nextLine();
//如果字符串不存在':'则跳过,直接再次重头再来
if(!nextLine.contains(":")) {
continue;
}
String[] split = nextLine.split(":");
//创建连接及通道,并通过JDK1.7自动关闭特性简化代码
try(Connection conn = cFactory.newConnection();
Channel channel = conn.createChannel()){
//创建交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME,split[0], null, split[1].getBytes("UTF-8"));
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
订阅者A:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 订阅者A: 日记级别: error
* @author zhengjie
*/
public class SubscribeA {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
try {
//获取连接
Connection connection = cFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//随机生成队列编号
String queue = channel.queueDeclare().getQueue();
//绑定队列与交换器
channel.queueBind(queue, EXCHANGE_NAME, "error");
//获取数据的回调函数: 采用Labda表达式
DeliverCallback deliverCallback = (consumerTag, delivery)->{
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("message _ " + routingKey + " : " + new String(delivery.getBody(),"UTF-8"));
};
channel.basicConsume(queue,true, deliverCallback, CancelCallback -> {});
}catch(Exception e) {
e.printStackTrace();
}
}
}
订阅者B:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 订阅者B: 日志级别: debug
*
* @author zhengjie
*/
public class SubscribeB {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
try {
// 获取连接
Connection connection = cFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 随机生成队列编号
String queue = channel.queueDeclare().getQueue();
// 绑定队列与交换器
channel.queueBind(queue, EXCHANGE_NAME, "debug");
// 获取数据的回调函数: 采用Labda表达式
DeliverCallback deliverCallback = (customTag, message) -> {
System.out.println("message _ debug : " + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queue,true, deliverCallback, CancelCallback -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
主题模式(Topics)
即通过通配符来选择指定接收端集合(发送到主题交换的邮件不能具有任意路由密钥: 必须是由点分隔的单词列表)。
主题模式的模型(引用自官网)
程序测试流程:
发布者连接到RabbitMQ服务后,声明交换器(exchange)将消息发布至交换器中并通过通配符指定接收者集合(必须是由点分隔的单词列表:
\* (星号) 只能替换一个词 | # (井号) 可以替换零个或多个单词。
)。启动多个接收端,连接RabbitMQ服务,通过通道各自声明一个临时队列。然后绑定队列至交换器上,指定的接收者集合从临时队列中获取消息并打印。发布端测试代码:
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 发布者
* @author zhengjie
*/
public class Publisher {
private static final String EXCHANGE_NAME = "topic";
public static void main(String[] args) {
//从键盘上输入:
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String msg = scanner.nextLine();
String[] split = msg.split(":");
//创建RabbitMQ连接工厂
ConnectionFactory cFactory = new ConnectionFactory();
try(Connection conn = cFactory.newConnection();
Channel channel = conn.createChannel()){
//创建交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = split[0];
byte[] message = split[1].getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message);
}catch(Exception e) {
e.printStackTrace();
}
}
}
}
订阅端A测试代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 订阅者A: 匹配: *.good.*
* @author zhengjie
*/
public class SubscribeA {
private static final String EXCHANGE_NAME = "topic";
public static void main(String[] args) {
ConnectionFactory cFactory = new ConnectionFactory();
try{
Connection conn = cFactory.newConnection();
Channel channel = conn.createChannel();
//创建交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定临时队列与交换器
channel.queueBind(queue, EXCHANGE_NAME, "*.good.*");
//返回消息回调
DeliverCallback deliverCallback = (customTag,deliverVal) -> {
System.out.println(deliverVal.getEnvelope().getRoutingKey() + ": "
+ new String(deliverVal.getBody(),"UTF-8"));
};
channel.basicConsume(queue, true, deliverCallback, CancelCallback -> {});
}catch(Exception e) {
e.printStackTrace();
}
}
}
订阅端B测试代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 订阅者B: 匹配: good.#
*
* @author zhengjie
*/
public class SubscribeB {
private static final String EXCHANGE_NAME = "topic";
public static void main(String[] args) {
ConnectionFactory cFactory = new ConnectionFactory();
try {
Connection conn = cFactory.newConnection();
Channel channel = conn.createChannel();
// 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列与交换器
channel.queueBind(queue, EXCHANGE_NAME, "good.#");
// 返回消息回调
DeliverCallback deliverCallback = (customTag, deliverVal) -> {
System.out.println(
deliverVal.getEnvelope().getRoutingKey() + ": " + new String(deliverVal.getBody(), "UTF-8"));
};
channel.basicConsume(queue, true, deliverCallback, CancelCallback -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
远程过程调用模式(RPC)
即客户端发送请求消息,服务器回复响应消息。
远程过程调用模型(引用自官网)
程序测试流程:
客户端发送带有两个属性的消息: replyTo字段,设置为仅为请求创建的匿名独占队列;correlationId字段,设置为每个请求的唯一值。请求被发送到指定队列中。服务端则等待请求,当出现请求时,它会执行该作业,并使用来自replyTo字段的队列将带有结果的消息发送回客户端。
服务端测试代码:
import com.rabbitmq.client.*;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
客户端测试代码:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
spring AMQP简单测试用代码
Service业务代码:
//service接口
import com.rabbitmq.bean.User;
public interface RabbitMqService {
//发送消息
void sendMsg(String msg);
//接收用户消息
void sendUser(User user);
}
//service实现
@Service
public class RabbitMqServiceImpl implements ConfirmCallback, RabbitMqService {
@Value("${rabbitmq.queue.msg}")
private String msgRouting = null;
@Value("${rabbitmq.queue.user}")
private String userRouting = null;
@Autowired
private RabbitTemplate rabbitTemplate = null;
//发送消息
@Override
public void sendMsg(String msg) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend(msgRouting,msg);
}
@Override
public void sendUser(User user) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend(userRouting, user);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
System.out.println("----消息确认送达成功----");
}else {
System.out.println("----消息确认送达失败----" + cause);
}
}
}
service层实现ConfirmCallback了接口,这样当服务端接收数据确认后会产生回调执行confirm
方法。
配置Bean:
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BeanConfig {
@Value("${rabbitmq.queue.msg}")
private String msgQueueName = null;
@Value("${rabbitmq.queue.user}")
private String userQueueName = null;
//创建字符串消息队列
//创建的队列对象为持久队列,则可以序列化到磁盘中,服务器重启不受影响。
@Bean
public Queue createQueueMsg() {
return new Queue(msgQueueName,true);
}
//创建对象消息队列
@Bean
public Queue createQueueUser() {
return new Queue(userQueueName,true);
}
}
spring容器中存在的Queue
对象会自动注册至rabbitmq中。
服务端接收逻辑:
@Component
public class RabbitMessageReceiver {
//定义监听字符串队列名称
@RabbitListener(queues= {"${rabbitmq.queue.msg}"})
public void receiveMsg(String msg) {
System.out.println("----收到消息:----" + msg);
}
//定义监听对象列名称
@RabbitListener(queues= {"${rabbitmq.queue.user}"})
public void receiveObj(User user) {
System.out.println("----收到对象:----" + user);
}
}
服务器通过@RabbitListener注解绑定对应的队列,当服务器接收到客户端发送的消息后执行指定的业务逻辑。
实体对象:
import java.io.Serializable;
//因为队列为持久队列,所以传输的对象需要序列化
public class User implements Serializable{
private Integer id;
private String userName;
private String userPass;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getUserPass() {
return userPass;
}
public void setUserPass(String userPass) {
this.userPass = userPass;
}
@Override
public String toString() {
return "User [id=" + id + ", userName=" + userName + ", userPass=" + userPass + "]";
}
}
测试总测试了发送/接收实体对象,所以创建了一个测试实体类。
客户端请求Controller:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.rabbitmq.bean.User;
import com.rabbitmq.service.RabbitMqService;
@Controller
public class RabbitMqController {
@Autowired
private RabbitMqService raiService;
//发送字符串请求
@RequestMapping("/str")
@ResponseBody
public String sendMsg(String msg) {
raiService.sendMsg(msg);
return "msg";
}
//发送对象请求
@RequestMapping("/user")
@ResponseBody
public String sendUser(User user) {
raiService.sendUser(user);
return "usr";
}
}
RabbitMQ的spring boot测试配置:
# 配置rabbitmq服务的主机地址
spring.rabbitmq.host=localhost
# 配置rabbitmq服务端口
spring.rabbitmq.port=5672
# rabbitmq发布者消息是否自动确认
spring.rabbitmq.publisher-confirms=true
# 自定义的配置
rabbitmq.queue.msg=rabbitmq.msg
rabbitmq.queue.user=abbitmq.entity
# 配置rabbitmq密码,这里没有密码
# spring.rabbitmq.password=
# 配置rabbitmq账户,这里没有账户
# spring.rabbitmq.username=
Maven项目依赖(Spring boot Pom.xml部分):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>