快速入门RabbitMQ核心概念
哪些互联网大厂在使用RabbitMQ,为什么?
初识RabbitMQ:
- RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
哪些互联网大厂在使用RabbitMQ:
- 滴滴、美团、头条、去哪儿、艺龙
为什么使用RabbitMQ:
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式(confirm) 、返回模式(return)
- 与SpringAMQP完美的整合、API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
RabbitMQ高性能的原因
- 主要原因是因为RabbitMQ使用Erlang语言编写,Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
- Erlang的优点:Erlang有着和原生Socket一样的延迟
AMQP高级消息队列协议与模型
什么是AMQP高级消息队列协议:
- AMQP全称是:Advanced Message Queuing Protocol,所以AMQP翻译过来就是:高级消息队列协议。AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP协议模型:
image.png
AMQP核心概念
- Server:又称Broker, 接受客户端的连接,实现AMQP实体服务
- Connection:连接,应用程序与Broker的网络连接
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
- Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
- Virtual host:虚拟主机,用于进行逻辑隔离,就有点类似于NameSpace或Group的概念,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue
- Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
- Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
- Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者
RabbitMQ整体架构与消息流转
RabbitMQ整体架构图:
image.png
RabbitMQ消息流转图:
image.png
RabbitMQ环境安装
官方下载地址:
我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:
例如,我这里要安装3.8.9版本的RabbitMQ,那么按官方的说明,我需要安装 22.3 ~ 23.x 版本的Erlang环境,我这里选择23.1.3版本的Erlang。使用如下命令下载RPM安装包:
[root@rabbitmq01 ~]# cd /usr/local/src
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.3/erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# ls
erlang-23.1.3-1.el7.x86_64.rpm rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]#
使用yum
命令进行安装,因为yum
可自动解决依赖关系:
[root@rabbitmq01 /usr/local/src]# yum install -y erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm
RabbitMQ新版本没有提供配置文件的示例,需要自己去Github上下载:
- https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/rabbitmq.conf.example
- https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/advanced.config.example
将配置文件放到/etc/rabbitmq
目录下:
[root@rabbitmq01 /usr/local/src]# mv rabbitmq.conf.example /etc/rabbitmq/rabbitmq.conf
修改配置文件:
[root@rabbitmq01 ~]# vim /etc/rabbitmq/rabbitmq.conf
# 允许默认用户被外部网络访问
loopback_users.guest = false
完成配置后,启动RabbitMQ Server:
[root@rabbitmq01 ~]# rabbitmq-server start &
检查端口是否正常监听,5672是RabbitMQ的默认端口号:
[root@rabbitmq01 ~]# netstat -lntp |grep 5672
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 1922/beam.smp
tcp6 0 0 :::5672 :::* LISTEN 1922/beam.smp
[root@rabbitmq01 ~]#
启用RabbitMQ的管控台插件,我们可以在管控台中查看RabbitMQ的基础监控信息,以及对RabbitMQ进行管理:
[root@rabbitmq01 ~]# rabbitmq-plugins enable rabbitmq_management
使用浏览器访问管控台的15672端口,进入到登录界面,默认用户名密码均为guest:
登录成功,进入到管控台首页:
image.png
rabbitmqctl
命令行操作
rabbitmqctl
基础操作命令:
# 关闭应用
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 节点状态
rabbitmqctl status
# 添加用户
rabbitmqctl add user username password
# 列出所有用户
rabbitmqctl list users
# 删除用户
rabbitmqctl delete_user username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 修改密码
rabbitmqctl change_password username newpassword
# 设置用户权限
rabbitmqctl set permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机
rabbitmqctl add vhost vhostpath
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列信息
rabbitmqctl list_queues
# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue
rabbitmqctl
高级操作命令:
# 移除所有数据,要在rabbitmqctIl stop_app之后使用
rabbitmqctl reset
# 组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]
# 查看集群状态
rabbitmqctl cluster_status
# 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram
# 忘记节点(摘除节点)
rabbitmqctl forget cluster_node [--offline]
# 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..]
生产者消费者代码示例
创建一个Maven工程,在pom
文件中添加如下依赖:
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
生产者代码示例:
package com.zj.rabbitmq.learn.basic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
/**
* 生产者
*
* @author 01
* @date 2020-11-23
**/
public class MyProducer {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
for (int i = 0; i < 5; i++) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ!";
// 不设置Exchange默认走default direct exchange,此时routingKey就是队列名称
channel.basicPublish("", "test001", null, msg.getBytes());
}
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.basic;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
/**
* 消费者
*
* @author 01
* @date 2020-11-23
**/
public class MyConsumer {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare("test001", true, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true){
channel.basicConsume("test001", true, consumer);
Thread.sleep(1000);
}
}
}
}
先运行消费者,再运行生产者,此时消费者控制台输出如下:
image.png
关于交换机
Exchange(交换机)用于接收消息,并根据路由键转发消息所绑定的队列:
image.png
交换机属性:
- Name:交换机名称
- Type:交换机类型direct、topic、 fanout、 headers
- Durability:是否需要持久化,true为持久化
- Auto Delete:当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
- Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用
Direct Exchange
- 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
- 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃
生产者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
public class ProducerOfDirectExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ of Direct Exchange!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
public class ConsumerOfDirectExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
// 声明一个direct类型的Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到指定的Exchange上
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true) {
channel.basicConsume(queueName, true, consumer);
Thread.sleep(1000);
}
}
}
}
Topic Exchange
- 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
- Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
- 可以使用通配符进行模糊匹配:
- 符号 "#" 匹配一个或多个词
- 符号 "*" 匹配不多不少一个词
- 例如:
- "log.#" 能够匹配到 "log.info.oa"
- "log.*" 只会匹配到 "log.error"
生产者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
public class ProducerOfTopicExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ of Topic Exchange!";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
public class ConsumerOfTopicExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.#";
// 声明一个topic类型的Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到指定的Exchange上
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true) {
channel.basicConsume(queueName, true, consumer);
Thread.sleep(1000);
}
}
}
}
Fanout Exchange
- 不处理路由键,只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的
生产者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
public class ProducerOfFanoutExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
String exchangeName = "test_fanout_exchange";
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
for (int i = 0; i < 10; i++) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ of Fanout Exchange!";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
public class ConsumerOfFanoutExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
// 不设置routingKey
String routingKey = "";
// 声明一个fanout类型的Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到指定的Exchange上
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true) {
channel.basicConsume(queueName, true, consumer);
Thread.sleep(1000);
}
}
}
}
绑定、队列、消息、虚拟主机
Binding - 绑定:
- Exchange和Exchange、Queue之 间的连接关系
- Binding中可以包含RoutingKey或者参数
Queue - 消息队列:
- 消息队列,实际存储消息数据
- Durability:是否持久化。Durable:是,Transient:否
- Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除
Message - 消息:
- 服务器和应用程序之间传送的数据
- 本质上就是一段数据,由Properties和Payload(Body)组成
- 常用属性:delivery mode、headers(自定义属性)
- Message其他属性:
- content_type、content_encoding、priority
- correlation id、reply_to、expiration、message_id
- timestamp、type、 user_id、app_id、 cluster_id
设置Message属性代码示例:
package com.zj.rabbitmq.learn.message;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
import java.util.HashMap;
import java.util.Map;
class MyProducer {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
Map<String, Object> headers = new HashMap<>();
headers.put("a", "1");
headers.put("b", "2");
// 自定义Message的一些属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 持久化模式
.deliveryMode(2)
// 消息的编码格式
.contentEncoding("UTF-8")
// 消息过期时间
.expiration("15000")
// 设置消息的头
.headers(headers)
.build();
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
for (int i = 0; i < 5; i++) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ!";
// 不设置Exchange默认走direct exchange,此时routingKey就是队列名称
channel.basicPublish("", "test001", properties, msg.getBytes());
}
}
}
}
Virtual host - 虚拟主机:
- 虚拟地址,用于进行逻辑隔离,最上层的消息路由
- 一个Virtual Host里面可以有若干个Exchange和Queue
- 同一个Virtual Host里面不能有相同名称的Exchange或Queue