RabbitMQ ACK、NACK、Type、TTL、死信

2020-09-28  本文已影响0人  dylan丶QAQ

起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,比如说我发布一个动态之后,需要在30分钟使用默认用户给他点几个赞,之前考虑使用redis的zset对他进行操作,之后决定使用RabbitMQ,专业的事情使用专业的工具来操作。


1. 消息接收的应答模式ACK和NACK的使用

import com.icoding.basic.po.OrderInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
​
@Component
public class OrderReceiver {
​
 int flag = 0;
​
 @RabbitListener(bindings = @QueueBinding(
 value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
 exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
 key = "order.*"
 )
 )
 @RabbitHandler
 public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println("************消息接收开始***********");
 System.out.println("Order Name: "+orderInfo.getOrder_name());
 Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
 //ACK进行签收,第一个参数是标识,第二个参数是批量接收为fasle
 //channel.basicAck(deliverTag,false);
 if(flag>3){
 //说明执行了3次都没有成功
 //消息确认
 channel.basicAck(deliverTag,false);
 //记录这个消息的日志或数据到DB/Redis/file
 }else {
 //可以设置时延迟几秒
 flag = flag+1;
 //前两个参数和上面ACK一样,第三个参数是否重回队列
 channel.basicNack(deliverTag, false, true);
 }
 }
}

2. Exchange交换机Type详解

3. 消息队列的TTL设置和使用

什么是TTL:Time To Live,也就是生存时间

import com.icoding.basic.po.OrderInfo;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
public class OrderSender {
​
 @Autowired
 RabbitTemplate rabbitTemplate;
​
 public void sendOrder(OrderInfo orderInfo) throws Exception{
 /**
 * exchange: 交换机名字
 * routingkey: 队列关联的key
 * object: 要传输的消息对象
 * correlationData: 消息的唯一id
 */
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(orderInfo.getMessage_id());
 //在这里设置message的TTL
 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
 @Override
 public Message postProcessMessage(Message message) throws AmqpException {
 message.getMessageProperties().setExpiration("5000");
 return message;
 }
 };
 //将messagePostProcessor加入到参数中
 rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,messagePostProcessor,correlationData);
 }
}

4. 死信队列详解及进入死信队列的机制

首先要看一下什么是死信:当一个消息无法被消费时就变成了死信

死信是怎么形成的:消息在没有被消费前就失效的就属于死信

一个系统中是没有无缘无故生成的消息,如果这个消息失效了没有了,是不是可能导致业务损失,如果这种消息我们需要记录或补偿,将这种消息失效的时候放到一个队列中,待我们人工补偿和消费,这个放死信的队列就是死信队列

希望我们的消息在失效的时候进入到死信队列中

我们的死信队列其实也是一个正常队列,只是赋予了他这个概念

死信队列的另一功能就是延迟消息

x-dead-letter-exchange :这个参数就是指定死信队列的Exchange的名字,这个Exchange就是一个普通的Exchange,需要手工创建

x-dead-letter-routing-key :这个参数就是指定死信队列的Routingkey,这个routingkey需要自己创建好队列和Exchange进行binding时填入

redis的keyevent通过pub/sub机制来订阅信息的,如果sub端在pub发布信息之后订阅就会导致信息丢失,而我们的死信因为时队列所以无所谓什么时候消费


不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!

上一篇 下一篇

猜你喜欢

热点阅读