2019-10-30
MQ
MQ
优势
劣势
1.系统可用性降低
2.系统的复杂度提高
3.一致性
常见的MQ的产品:RabbitMQ,RocketMQ(阿里),kafka
简介
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
子主题 3
快速入门
需求:完成消息的传递
子主题 2
高级特性
消息可靠性投递(提供者发送消息,消费者消费消息包含两种模式)
confirm:确认模式
return:腿回模式
Consumer ACK(消费端收到消息后的确认方式)
自动确认;acknowledge="**none**"
手动确认:acknowledge="**manual**"
消费端限流
TTL(TTL 全称 Time To Live(存活时间/过期时间))
1配置文件
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
2测试类
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl",
"ttl.hehe", "message ttl....");
}
}
3.设置单个消息的过期时间
@Test
public void testTtl() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
}
};
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl",
"ttl.hehe", "message ttl....",messagePostProcessor);
for (int i = 0; i < 10; i++) {
if(i == 5){
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}else{
//不过期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
}
死信队列
消息成为死信的三种情况:**
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列
TTL+死信队列
日志与监控
消息可靠性分析与追踪
管理
应用问题
消息可靠信保障
消息幂等性处理(一个消息多次处理结果是一样的)
集群搭建
RabbitMQ高可用集群
镜像集群配置
负载均衡-HAProxy