互联网科技Java 杂谈

RabbitMQ 延迟队列,消息延迟推送

2019-06-05  本文已影响2人  a13ed6c7cc5e

应用场景

image

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

消息延迟推送的实现

image

首先我们创建交换机和消息队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

 public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
 public static final String LAZY_QUEUE = "MQ.LazyQueue";
 public static final String LAZY_KEY = "lazy.#";

 @Bean
 public TopicExchange lazyExchange(){
 //Map<String, Object> pros = new HashMap<>();
 //设置交换机支持延迟消息推送
 //pros.put("x-delayed-message", "topic");
 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
 exchange.setDelayed(true);
 return exchange;
 }

 @Bean
 public Queue lazyQueue(){
 return new Queue(LAZY_QUEUE, true);
 }

 @Bean
 public Binding lazyBinding(){
 return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
 }
}

我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。

 //Map<String, Object> pros = new HashMap<>();
 //设置交换机支持延迟消息推送
 //pros.put("x-delayed-message", "topic");
 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

 @Autowired
 private RabbitTemplate rabbitTemplate;

 //confirmCallback returnCallback 代码省略,请参照上一篇

 public void sendLazy(Object message){
 rabbitTemplate.setMandatory(true);
 rabbitTemplate.setConfirmCallback(confirmCallback);
 rabbitTemplate.setReturnCallback(returnCallback);
 //id + 时间戳 全局唯一
 CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

 //发送消息时指定 header 延迟时间
 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
 new MessagePostProcessor() {
 @Override
 public Message postProcessMessage(Message message) throws AmqpException {
 //设置消息持久化
 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
 //message.getMessageProperties().setHeader("x-delay", "6000");
 message.getMessageProperties().setDelay(6000);
 return message;
 }
 }, correlationData);
 }
}

我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于我们手动设置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
 if (delay == null || delay < 0) {
 this.headers.remove(X_DELAY);
 }
 else {
 this.headers.put(X_DELAY, delay);
 }
}

消费端进行消费

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

 @RabbitListener(queues = "MQ.LazyQueue")
 @RabbitHandler
 public void onLazyMessage(Message msg, Channel channel) throws IOException{
 long deliveryTag = msg.getMessageProperties().getDeliveryTag();
 channel.basicAck(deliveryTag, true);
 System.out.println("lazy receive " + new String(msg.getBody()));

 }

测试结果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

 @Autowired
 private MQSender mqSender;

 @Test
 public void sendLazy() throws Exception {
 String msg = "hello spring boot";

 mqSender.sendLazy(msg + ":");
 }
}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

Java学习、面试;文档、视频资源免费获取

上一篇 下一篇

猜你喜欢

热点阅读