rabbitmq发送延迟消息

2020-02-28  本文已影响0人  无尘粉笔

软件准备

erlang

本文使用的版本是:Erlang 20.3

RabbitMQ

本文使用的是 window 版本的RabbitMQ,版本号是:3.7.4

rabbitmq_delayed_message_exchange插件

插件下载地址:http://www.rabbitmq.com/community-plugins.html

打开网址后,ctrl + f,搜索 rabbitmq_delayed_message_exchange 。

[图片上传失败...(image-568eaa-1582857295002)]

千万记住,一定选好版本号,由于我使用的是RabbitMQ 3.7.4,因此对应的 rabbitmq_delayed_message_exchange 插件也必须选择3.7.x的。

如果没有选对版本,在使用延迟消息的时候,会遇到各种各样的奇葩问题,而且网上还找不到解决方案。我因为这个问题,折腾了整整一个晚上。请牢记,要选对插件版本。

下载完插件后,将其放置到RabbitMQ安装目录下的 plugins 目录下,并使用如下命令启动这个插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果启动成功会出现如下信息:

The following plugins have been enabled: rabbitmq_delayed_message_exchange

启动插件成功后,记得重启一下RabbitMQ,让其生效。

集成RabbitMQ

这个就非常简单了,直接在maven工程的pom.xml文件中加入

<``dependency``>

<``groupId``>org.springframework.boot</``groupId``>

<``artifactId``>spring-boot-starter-amqp</``artifactId``>

</``dependency``>

Spring Boot的版本我使用的是 2.0.1.RELEASE .

接下来在 application.properties 文件中加入redis配置:

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

定义ConnectionFactory和RabbitTemplate

也很简单,代码如下:

package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

@ConfigurationProperties``(prefix = "spring.rabbitmq"``)

public class RabbitMqConfig {

private String host;

private int port;

private String userName;

private String password;

@Bean

public ConnectionFactory connectionFactory() {

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);

cachingConnectionFactory.setUsername(userName);

cachingConnectionFactory.setPassword(password);

cachingConnectionFactory.setVirtualHost(``"/"``);

cachingConnectionFactory.setPublisherConfirms(``true``);

return cachingConnectionFactory;

}

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());

return rabbitTemplate;

}

public String getHost() {

return host;

}

public void setHost(String host) {

this``.host = host;

}

public int getPort() {

return port;

}

public void setPort(``int port) {

this``.port = port;

}

public String getUserName() {

return userName;

}

public void setUserName(String userName) {

this``.userName = userName;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this``.password = password;

}

}

Exchange和Queue配置

package com.mq.rabbitmq;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class QueueConfig {

@Bean

public CustomExchange delayExchange() {

Map<String, Object> args = new HashMap<>();

args.put(``"x-delayed-type"``, "direct"``);

return new CustomExchange(``"test_exchange"``, "x-delayed-message"``,``true``, false``,args);

}

@Bean

public Queue queue() {

Queue queue = new Queue(``"test_queue_1"``, true``);

return queue;

}

@Bean

public Binding binding() {

return BindingBuilder.bind(queue()).to(delayExchange()).with(``"test_queue_1"``).noargs();

}

}

这里要特别注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的类型必须是 x-delayed-message 。

实现消息发送

package com.mq.rabbitmq;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;

import java.util.Date;

@Service

public class MessageServiceImpl {

@Autowired

private RabbitTemplate rabbitTemplate;

public void sendMsg(String queueName,String msg) {

SimpleDateFormat sdf = new SimpleDateFormat(``"yyyy-MM-dd HH:mm:ss"``);

System.out.println(``"消息发送时间:"``+sdf.format(``new Date()));

rabbitTemplate.convertAndSend(``"test_exchange"``, queueName, msg, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setHeader(``"x-delay"``,``3000``);

return message;

}

});

}

}

注意在发送的时候,必须加上一个header

x-delay

在这里我设置的延迟时间是3秒。

消息消费者

package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;

import java.util.Date;

@Component

public class MessageReceiver {

@RabbitListener``(queues = "test_queue_1"``)

public void receive(String msg) {

SimpleDateFormat sdf = new SimpleDateFormat(``"yyyy-MM-dd HH:mm:ss"``);

System.out.println(``"消息接收时间:"``+sdf.format(``new Date()));

System.out.println(``"接收到的消息:"``+msg);

}

}

运行Spring Boot程序和发送消息

直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。

接下来只需要用Junit运行一下发送消息的接口即可。

package com.mq.rabbitmq;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith``(SpringRunner.``class``)

@SpringBootTest

public class RabbitmqApplicationTests {

@Autowired

private MessageServiceImpl messageService;

@Test

public void send() {

messageService.sendMsg(``"test_queue_1"``,``"hello i am delay msg"``);

}

}

上一篇下一篇

猜你喜欢

热点阅读