RabbitMQ 消息队列
2020-05-12 本文已影响0人
潜心之力
一、Windows 安装
- 下载erlang,官方地址:http://www.erlang.org/downloads,找到win-64-bit的安装包进行下载安装,安装过程直接选next步骤,自定义安装位置。
- 配置erlang环境,进入系统高级设置,新建环境变量,变量名:ERLANG_HOME,变量值:D:\Program Files\erl10.7,新建完成后修改环境变量Path,将%ERLANG_HOME%\bin;值追加至原来的Path变量值中。修改完成后打开命令行窗口,输入erl回车,出现版本信息证明安装成功,Eshell V10.7 (abort with ^G)。
- 下载rabbitmq,官方地址:http://www.rabbitmq.com/download.html,找到标题Downloads on Bintray下的Windows installer,点击进行下载安装,过程同样直接选next步骤和自定义安装位置。
- 下载RabbitMQ-Plugins,打开命令行窗口,D:,cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin,rabbitmq-plugins enable rabbitmq_management,先定位到sbin目录下再执行安装语句。 image.png
- 测试:进入安装目录sbin,双击rabbitmq-server.bat,访问http://localhost:15672/,账号和密码都是guest,登录成功则所有安装步骤已完成。
二、整合SpringBoot
<dependency> -> pom.xml
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring: -> application.yml
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
三、交换机模式
1、Direct模式是RabbitMQ默认的交换机模式,创建消息队列的时候绑定一个BindingKey,可以理解为消息队列的名称,生产者发送消息的时候绑定BindingKey,可以理解为将消息发送到指定的队列中,消费者监听队列的时候绑定BindingKey,可以理解为读取指定队列中的消息。直连模式是典型的一对一,当生产者发送一个消息到队列中,只要有任意一个消费者将消息从队列中取出,消息立即被消耗,其他的消费者不会获取到该消息。
@Component
public class RabbitSender { -> 消息生产者,用于发送消息到队列
@Autowired
private AmqpTemplate mAmqpTemplate;
public void message(String msg){
mAmqpTemplate.convertAndSend("DirectQueue",msg);
}
}
@Component
@RabbitListener(queues = "DirectQueue")
public class RabbitReceiver { -> 消息消费者,用于接收处理队列中的消息
@RabbitHandler
public void message(String msg){
System.out.println("收到消息:"+msg);
}
}
@Configuration
public class RabbitConfig { -> 创建消息队列
@Bean
public Queue DirectQueue(){
return new Queue("DirectQueue");
}
}
2、Topic模式支持按指定规则匹配转发,路由键是一段带有“.”分隔的字符串。#代表匹配一个或多个任意字符,*代表匹配一个任意字符。主题模式是典型的一对多,生产者可以通过指定的规则模糊匹配相关的队列进行消息投递,灵活性高。
@Component
public class RabbitSender {
@Autowired
private AmqpTemplate mAmqpTemplate;
public void message1(String msg){
mAmqpTemplate.convertAndSend("exchange","topic.queue1",msg);
}
public void message2(String msg){
mAmqpTemplate.convertAndSend("exchange","topic.queue2",msg);
}
}
@Component
public class RabbitReceiver {
@RabbitHandler
@RabbitListener(queues = "topic.queue1")
public void message1(String msg) {
System.out.println("message1:" + msg);
}
@RabbitHandler
@RabbitListener(queues = "topic.queue2")
public void message2(String msg) {
System.out.println("messages2:" + msg);
}
}
@Configuration
public class RabbitConfig {
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2");
}
@Bean
TopicExchange exchange() { -> 创建消息交换机
return new TopicExchange("exchange");
}
@Bean
Binding bindingTopicQueue1(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.queue*");
}
@Bean
Binding bindingTopicQueue2(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
3、Fanout模式是广播,所有绑定到该交换机下的队列都能接收到消息,也就意味着在该模式下BindingKey是会被忽略的。
@Component
public class RabbitSender {
@Autowired
private AmqpTemplate mAmqpTemplate;
public void message3(String msg){
mAmqpTemplate.convertAndSend("fanoutExchange","",msg); -> 参数2被忽略
}
}
@Component
public class RabbitReceiver {
@RabbitHandler
@RabbitListener(queues = "fanout.queue1")
public void message1(String msg) {
System.out.println("message1:" + msg);
}
@RabbitHandler
@RabbitListener(queues = "fanout.queue2")
public void message2(String msg) {
System.out.println("messages2:" + msg);
}
@RabbitHandler
@RabbitListener(queues = "fanout.queue3")
public void message3(String msg) {
System.out.println("messages3:" + msg);
}
}
@Configuration
public class RabbitConfig {
@Bean(name="FanoutQueue1")
public Queue AMessage() {
return new Queue("fanout.queue1");
}
@Bean(name="FanoutQueue2")
public Queue BMessage() {
return new Queue("fanout.queue2");
}
@Bean(name="FanoutQueue3")
public Queue CMessage() {
return new Queue("fanout.queue3");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingFanoutQueue1(@Qualifier("FanoutQueue1") Queue queue,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
Binding bindingFanoutQueue2(@Qualifier("FanoutQueue2") Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
Binding bindingFanoutQueue3(@Qualifier("FanoutQueue3") Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
四、Windows 集群
简介:集群(cluster)就是一组计算机,它们作为一个整体向用户提供一组网络资源,这些单个的计算机系统就是集群的节点(node)。集群的特点如下:1、可扩展性,集群的性能不仅限于单一的服务实体,新的服务实体也可以动态地加入到集群。2、高可用性,当一台节点服务器发生故障的时候,这台服务器上所运行的应用程序将被另一节点的服务器自动接管。3、负载均衡,把任务均匀地分布到集群环境下的网络资源,以便提高数据吞吐量。
- 准备好多台计算机或虚拟机,统一安装RabbitMQ和配置以下步骤。
- 查看节点名称,打开命令行窗口,D:,cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin,rabbitmqctl status。回车确定后的第一行信息就能看到节点名称,Status of node rabbit@WINDOWS-OCL0DO8 ...
- 创建集群配置文件,进入目录C:\Users\Administrator\AppData\Roaming\RabbitMQ,其中AppData是隐藏目录需要手动将其显示,创建文件rabbitmq.config并往里面输入[{rabbit,[{cluster_nodes, ['rabbit@WINDOWS-OCL0DO7', 'rabbit@WINDOWS-OCL0DO8']}]}].(这里的点不要忽略)。
- 修改hosts配置文件,进入目录C:\Windows\System32\drivers\etc找到hosts并输入192.168.0.87 rabbit@WINDOWS-OCL0DO7、192.168.0.87 rabbit@WINDOWS-OCL0DO8(这里分多行,首行IP必须是当前计算机)。
- 创建环境变量文件,进入目录C:\Users\Administrator\AppData\Roaming\RabbitMQ,创建文件rabbitmq-env.conf并往里面输入NODENAME=rabbit@WINDOWS-OCL0DO7、NODE_IP_ADDRESS=192.168.0.87、NODE_PORT=5672、RABBITMQ_MNESIA_BASE=C:\Users\Administrator\AppData\Roaming\RabbitMQ\db、RABBITMQ_LOG_BASE=C:\Users\Administrator\AppData\Roaming\RabbitMQ\log(这里分多行,节点名称、IP地址、端口都配置为当前计算机的)
- 统一.erlang.cookie文件,该文件作为集群的通信密钥。这里要把集群中的计算机分为一主多干,把主计算机中的C:\Users\Administrator\.erlang.cookie文件复制并替换到干计算机中的C:\Users\Administrator\.erlang.cookie文件和C:\Windows\System32\config\systemprofile\.erlang.cookie
- 重启集群中的干分支
rabbitmqctl stop_app、rabbitmqctl reset、rabbitmqctl start_app - 重启集群中的主分支并纳入干分支
rabbitmqctl stop_app、rabbitmqctl reset、rabbitmqctljoin_cluster rabbit@WINDOWS-OCL0DO8、rabbitmqctl start_app - 注意事项:搭建集群建议最少设置一个磁盘节点以防止机器发生意外丢失数据。rabbitmq有两种类型的节点分别为磁盘和内存,顾名思义就是磁盘节点数据存放在磁盘,安全性高读取效率低,内存节点数据存放在内存,安全性低读取效率高,节点默认的开启方式是磁盘。
- 集群和分布式的区别:分布式是指通过网络连接的多个组件,集群是指同一种组件的多个实例。
- 镜像配置,目的是同步消息 mirrow