【RabbitMQ】Spring-Boot 整合 使用教程 可靠
2019-02-27 本文已影响0人
植富宝
==1. 简介==
1.1 RabbitMQ定义
RabbitMQ是一个消息代理和队列服务器,用来在不同应用之间共享数据,是Erlang语言开发的,基于AMQP协议。
1.2 AMQP定义
是一个二进制协议。
1.3 AMQP协议模型
AMQP协议模型1.4 核心概念
1. Server:Broker,接受客户端连接
2. Connection:连接,应用程序与Broker的网络连接
3. Channel:网络信道,Channel是消息读写的通道
4. Message:消息,传递的数据,有properties何body组成,properties是消息的属性(可以设置顺序ID),body是消息内容
5. Virtual-Host:虚拟地址,用于"逻辑隔离",最上层的"消息路由",一个Virtual-Host中有多个Exchange和Queue,但是不能有同名的
6. Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列
7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing-key
8. Routing-key:一个路由规则,虚拟机可用他来确定如何路由一个特定消息
9. Queue:消息队列,保存消息并将它们转发给消费者
1.5 整体架构图
整体架构图1.6 消息流转图
消息流转图1.7 交换机图
交换机图==2. 安装与配置==
2.1 准备
1. rabbitMQ版本要与erlang版本对应起来
2. rabbitMQ-rpm和erlang-rpm可以去官网下载,tcp_wrappers、socat可以去https://pkgs.org下载
2.2 安装+启动
1. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
2. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
3. rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm 如果上一步提示缺少socat
4. rpm -ivh tcp_wrappers-7.6-77.el7.x86_64.rpm 如果上一步提示缺少tcp_wrappers
5. rabbitmq-server start &
6. rabbitmqctl stop_app
7. rabbitmq-plugins enable rabbitmq_managment
2.3 常用命令
1. rabbitmqctl stop_app
2. rabbitmqctl start_app
3. rabbitmqctl status 节点状态
4. rabbitmqctl list_users 列出所有用户
5. rabbitmqctl list_user_permissions username 列出用户权限
6. rabbitmqctl change_password username newpwd 修改用户密码
7. rabbitmqctl list_vhosts 列出所有虚拟主机
8. rabbitmqctl list_permissions -p vhostpath 列出该虚拟主机的所有权限
9. rabbitmqctl list_queues 列出所有队列
10. rabbitmqctl reset 移除所有数据
11. rabbitmqctl join_cluster <cluster-node> [--ram] 组成集群命令
12. rabbitmqctl cluster_status 查看集群状态
==3. Exchange==
3.1 交换机属性
1. name:交换机名称
2. type:交换机类型(direct、topic、fanout、headers)
3. durability:是否需要持久化,true为持久化
4. auto-delete:Exchange上的最后一个Queue被删除后,自动删除该Exchange
5. arguments:自定义参数
3.2 DirectExchange-直连
发送到DirectExchange的消息,会被转发到RouteKey中指定的Queue。
Direct可以使用Default-Exchange,不需要进行任何的binding操作,消息传递时,RouteKey必须完全匹配。
DirectExchange
3.3 TopicExchange-匹配
发送到TopicExchange的消息,会被转发到,匹配RouteKey中指定的Queue。
#:匹配多个词
*:匹配一个词
TopicExchange
3.4 FanoutExchange
不处理路由键,只要将队列绑定到交换机上;
发送到交换机上的消息,都会被转发到,与该交换机绑定的所有队列上;
FanoutExchange转发消息是最快的;
FanoutExchange
==4. Binding+Queue+Message+Virtual==
4.1 Binding-绑定
Exchange<-->Exchange,Exchange<-->Queue,他们之间的绑定关系
Binding中可以包含RouteKey或者参数
4.2 Queue-消息队列
实际存储消息数据
Durability:是否持久化,Durable:是,Transient:否
Auto-Delete:如果yes,则最后一个监听被移除后,该Queue也会自动被删除
4.3 Message-消息
应该程序和服务器之间传递的数据,由Properties(可以设置顺序ID)和Body组成
常用属性:delivery_mode、headers(自定义属性)、correlation_id:唯一id、expiration:过期时间
4.4 Virtual-Host-虚拟主机
虚拟地址,用于逻辑隔离,最上层的消息路由
一个Virtual-Host可以有若干个Exchange和Queue,但是同一个Virtual-Host中不能有同名的Exchange和Queue
==5. 高级特性==
5.1 消息如何保证100%的投递成功
image1. 消息落库,对消息状态进行打标
image2. 消息延迟投递,做二次确认,回调检查
5.2 幂等性
1. 定义
幂等性 就是防止高并发的情况下,执行结果都是唯一的。
消费端实现幂等性,就是消息永远被消费一次。
2. 解决方案
1. 唯一ID+指纹码,利用数据库主键去重
SELECT COUNT(1) FROM T_ORDER WHERE 唯一ID + 指纹码
COUNT(1) == 0,则INSERT;
好处:简单
坏处:高并发下有数据库写入的性能瓶颈
解决:根据ID进行分库分表,进行算法路由
2. 利用redis的原子性实现
setnx key value、exists key、redis的自增
问题:
数据是否需要落库,落库的话,缓存和数据库如何保证原子性?
数据不落库,如何设置定时同步策略?
5.3 confirm确认消息
1. 在channel中开启确认模式:channel.confirmSelect();
2. 在channel中添加监听:addConfirmListener();
3. 发生Nack的情况:磁盘写满、Queue达到上线、MQ其他异常
4. ack和Nack都收不到的情况:就要定时任务去处理
5.4 return消息机制
如果发送的消息,Exchange不存在或者RouteKey路由不到,这时就需要returnListener。
Mandatory:true-监听器接受到这些不可达的消息,false-broker会自动删除这些消息。
消费端自定义监听:继承DefaultConsumer
image
5.5 消费端限流
生产端不会限流,只有消费端限流;当机器突然有上万条消息,不做限流,可能会导致消费端服务器崩溃。
RabbitMQ提供了qos功能:非自动签收消息的情况下,一定数量消息未被确认前(通过consumer或channel设置qos值),不进行消费新的消息
void BasicQos(uint prefetchSize = 0 不限制消息大小,
ushort prefetchCount = 1 一次处理1条,手动ack后,在处理另一条,
bool global = false 这个限制是channel级别还是consumer级别);
consumer-->handleDelivery-->channel.basicAck(envelope.getDeliveryTag(), false);
consumer-->handleDelivery-->channel.basicNack(envelope.getDeliveryTag(), false, true-->重发);
5.6 TTL队列/消息
Time To Live 生存时间
支持消息的过期时间和队列的过期时间
5.7 DLX-死信队列
当消息变成死信(没有被消费者消费掉)的时候,他将被重新发送到另一个Exchange,这个Exchange就是死信队列
消息变成死信的情况:
1. 消息被拒绝(basic.reject/basic.nack)并且requeue=false
2. TTL过期
3. 队列打到最大长度
在队列上添加:arguments.put("s-dead-letter-exchange", "dlx.exchange");
==6. Spring-Boot-Demo==
6.1 pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2 application.yml
1. 公共配置
spring:
rabbitmq:
addresses: 192.168.11.76:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
2. 生产端配置
publisher-confirms: true
publisher-returns: true
template:
mandatory: true # 保证监听有效
3. 消费端配置
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
order:
key: springboot.*
queue:
name: queue-1
durable: true
exchange:
name: exchange-1
durable: true
type: topic
ignoreDeclarationExceptions: true
6.3 生产端
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// 回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("异常处理...");
}
}
};
// 回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
// 发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("1234567890"); // id + 时间戳 全局唯一
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
// 发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("0987654321"); //id + 时间戳 全局唯一
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}
}
6.4 消费端
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"),
exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*"))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// 手动ACK
channel.basicAck(deliveryTag, false);
}
@Value("spring.rabbitmq.listener.order.key")
private String orderKey;
@Value("spring.rabbitmq.listener.order.queue.name")
private String orderQueueName;
@Value("spring.rabbitmq.listener.order.queue.durable")
private String orderQueueDurable;
@Value("spring.rabbitmq.listener.order.exchange.name")
private String orderExchangeName;
@Value("spring.rabbitmq.listener.order.exchange.durable")
private String orderExchangeDurable;
@Value("spring.rabbitmq.listener.order.exchange.type")
private String orderExchangeType;
@Value("spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions")
private String orderExchangeIgnoreDeclarationExceptions;
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = orderQueueName, durable = orderQueueDurable),
exchange = @Exchange(value = orderExchangeName, durable = orderExchangeDurable, type = orderExchangeType, ignoreDeclarationExceptions = orderExchangeIgnoreDeclarationExceptions),
key = orderKey))
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动ACK
channel.basicAck(deliveryTag, false);
}
}
==7. Spring-Cloud-Stream==
7.1 架构图
imageimage
image
7.2 概念
Barista接口:用来定义通道的类型和名称,通道名称作为配置用,通道类型作为该通道是发送消息还是接受消息
@output:输出注解
@input:输入注解
@StreamListener:监听消息注解
7.3 Demo
7.3.1 pom依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>1.3.4.RELEASE</version>
</dependency>
7.3.2 producer-application.yml
spring:
cloud:
stream:
bindings:
output_channel:
destination: exchange-3
group: queue-3
binder: rabbit-cluster
binders:
rabbit-cluster:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 192.168.11.76:5672
username: guest
password: guest
virtual-host: /
7.3.3 定义通道
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
// @Output声明了它是一个输出类型的通道,名字是output_channel。
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel logoutput();
}
7.3.4 发送消息
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Autowired
private Barista barista;
// 发送消息
public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
try{
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = barista.logoutput().send(msg);
System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
7.3.5 consumer-application.yml
spring:
cloud:
stream:
bindings:
input_channel:
destination: exchange-3
group: queue-3
binder: rabbit-cluster
consumer:
concurrency: 1
rabbit:
bindings:
input_channel:
consumer:
requeue-rejected: false # 是否支持重发
acknowledge-mode: MANUAL # 手动签收
recovery-interval: 3000 # 3s重连
durable-subscription: true # 是否启用持久化订阅
max-concurrency: 5 # 最大监听数
binders:
rabbit-cluster:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 192.168.11.76:5672
username: guest
password: guest
virtual-host: /
7.3.6 定义通道
public interface Barista {
String INPUT_CHANNEL = "input_channel";
// @Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL。
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel loginput();
}
7.3.7 消费消息
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受数据:" + message);
channel.basicAck(deliveryTag, false);
}
}
==8. RabbitMQ集群架构模式==
8.1 简介
1. 主备模式:实现高可用集群,一般在并发和数据量不高的情况下使用,也称Warren模式。
2. 远程模式:实现双活的模式,也称Shovel模式,消息进行不同数据中心的复制工作,可以跨地域的两个MQ集群互联。
3. 镜像模式:也称Mirror模式,保证100%数据不丢失,简单、用的多。
镜像队列:保证数据高可靠性方案,主要是实现数据同步
8.2 架构模式图
image8.2.1 镜像模式
image8.2.2 多活模式
image
==9. 架构设计==
9.1 SET化架构
业务:解决业务遇到的扩展性和容灾等需求,支撑业务的高速发展
通用性:架构形成统一解决方案,岸边各业务线接入使用
image
image
image