基于RabbitMQ的延迟队列
什么是延迟队列
延迟队列,即消息发送之后,在一段时间之后延迟被消费端消费的消息队列。比如我们发送一条消息,希望在半个小时之后才可以被消费端消费到的这种场景中就可以用到延迟队列了。
利用rabbitmq实现延迟队列
rabbitmq的3.6.版本中可以使用一个插件rabbitmq-delayed-message-exchange 构建一个的延迟队列*。
如何安装和使用rabbitmq可以参照我之前的文章:
安装
下载 https://dl.bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange-0.0.1.ez 插件安装到rabbitmq
的插件目录 (一般是/usr/lib/rabbitmq/plugins 或者 /usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins 目录)
然后在rabbitmq-server
运行的状态下,运行这条命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在spring应用中使用这个特性
public static final String QUEUE_NAME = "delay_queue";
public static final String EXCHANGE_NAME = "delay_exchange";
@Bean
Queue queue() {
return new Queue(QUEUE_NAME, true);
}
// 定义一个延迟交换机
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定队列到这个延迟交换机上
@Bean
Binding binding(Queue queue, CustomExchange delayExchange) {
return BindingBuilder.bind(queue).to(delayExchange).with(QUEUE_NAME).noargs();
}
上面定义了一个x-delayed-message
类型的交换机,由于Spring AMQP
中没有这个类型的交换机,所以我们使用一个CustomExchange
来定义这个插件构建的交换机。
发送消息到这个交换机上:
MessageProperties properties = new MessageProperties();
properties.setDelay(5000);
Message message = new Message("delay_test_message".getBytes(), properties);
rabbitTemplate.send(RabbitMQConfiguration.EXCHANGE_NAME, RabbitMQConfiguration.QUEUE_NAME, message);
我惊讶的发现,Spring AMQP居然已经在方法上支持了x-delay
这个属性,但是奇怪的是他们没有提供适配延迟队列的交换机类,还需要自己定义一个CustomExchange
。另外一边我定义了一个Listener
类:
@Component
public class DelayListener {
// 消息转换器
@RabbitListener(queues = RabbitMQConfiguration.QUEUE_NAME)
public void consumer(Message message) {
System.out.println(new Date() + " ---> " + new String(message.getBody()));
}
}
我发送出这条消息:
send amqp message in Mon Oct 09 20:46:02 CST 2017
在5秒之后,确实在接收到了这条消息:
Mon Oct 09 20:46:07 CST 2017 ---> delay_test_message
rabbitmq之外的方案
之前我也考虑利用redis
的expire
命令配合发布/订阅模型构建延迟队列,最终发现redis
完成这个场景有一个很大的弊端,一旦消息被订阅,消费端的服务没有处理成功,那么这条消息就非常有可能再也无法处理了。除此之外,RocketMQ
内置了延迟队列的功能,但是只能支持1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这么几种固定时间的延迟消息,自定义不是非常方便。