《RabbitMq实战指南》读书笔记
一、消息中间件
消息队列中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布系统的集成。通过提供消息传递和消息排队模型,可以在分布式环境下扩展进程间的通信。可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能。当前场景的消息中间件有ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ等
二、RabbitMQ概念介绍
RabbitMQ是采用Erlang语言实现AMQP的消息中间件,最初起源于金融系统。用于在分布式系统中存储转发消息.
AMQP:高级消息队列协议,由Cisco、RedHat、iMatix等在2006年联合指定。是应用层协议的一个开放标准.
RabbitMQ特点:
- 可靠性
如持久化、传输确认及发布确认等 - 灵活路由
- 扩展性
- 高可用性
- 多种协议
RabbitMQ除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息中间件协议 - 多语言支持
- 易用的用户管理界面
- 插件机制
2.1 RabbitMQ核心概念解读:
表头 | 表头 |
---|---|
Connection | TCP连接 |
Channel | 通道。AMQP 0-9-1支持单一TCP连接多路复用,可以在单一连接开启多个通道作为“轻量级连接” |
Exchange | 拿邮局举例,就是邮局邮箱,作为消息中转站 |
Routing Key | 拿邮局举例,就是邮件发送地址 |
Queue | 拿邮局举例,收货方邮箱。用于存储消息。 |
Binding | 通过绑定,可将交换器和队列进行关联,绑定时会指定routing-key,交换器通过routing-key来决定消息的走向。 |
Broker | rabbitmq 服务器 |
Virtual Host | RabbitMQ是一个多租户系统,链接、队列、绑定、用户权限等等都是单属于某个Virtual Host。Virtual Host提供了资源的逻辑分组和隔离,所以在与RabbitMQ建立链接时,都需要指定需要连接哪个Virtual Host。 |
-
Connection和Channel
Connection就可以完成信道的工作,为何还需要引入信道呢?一个应用程序需要由很多个线程从RabbitMQ中消费消息,或者生产消息,那么就需要建立很多个connection,及很多个TCP连接。然后对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,遇到使用高峰,性能瓶颈也会随之显现。所以RabbitMQ采用类似NIO的做法,选择TCP连接复用,减少性能开销的同时也易于管理。
2.2 生产者Publisher
生产者消息元数据以及投递信息
生产者每次投递消息时,投递的内容包括消息元数据(message metadata)和投递信息(delivery information)。
投递信息:
属性 | 类型 | 描述 |
---|---|---|
Delivery tag | 正数 | |
Redelivered | Boolean | |
Exchange | String | 交换器名称 |
Routing key | String | 路由键 |
Consumer tag | String | 消费者唯一标识 |
消息属性:
对应于java类库中的 com.rabbitmq.client.AMQP.BasicProperties
属性 | 类型 | 描述 | 是否必选 |
---|---|---|---|
Delivery mode | 1 or 2 | 1:代表消息是瞬时的,不需要持久化;2:代表消息是持久化的 | 是 |
Type | String | 可以设置为任意字符串,用于传达本消息的类型 | 否 |
Headers | Map (string => any) | 消息头 | 否 |
Content type | String | 消息内容类型,就是平时请求中的Content-type,比如发送消息是一个json,则设置为application/json ,决定消息如何进行序列化和反序列化 | 否 |
Content encoding | String | 消息编码格式,决定消息如何进行序列化和反序列化 | 否 |
Message ID | String | 顾名思义,消息的ID | 否 |
Correlation ID | String | 请求ID | 否 |
Reply To | String | 回调队列名称 | 否 |
Expiration | String | 消息TTL | 否 |
Timestamp | Timestamp | 消息时间戳 | 否 |
User ID | String | 用户ID | 否 |
App ID | String | 应用ID | 否 |
生产者确认
RabbitMQ提供了多种生产者confirm机制来跟踪保证哪些消息被正确成功接收处理。
异步confirm
通过设置回调函数com.rabbitmq.client.ConfirmCallback或com.rabbitmq.client.ConfirmListener,当服务端确认了一条或多条消息后,客户端会调用此回调函数。基本步骤为:
1.开启channel的生产者确认
2.维护一个map,记录每一条发出去的消息
3.当收到一个消息的ACK,则将此消息从map中移除
4.当收到一个拒绝或者异常情况下,将此消息移除并定时重发
批量confirm(影响吞吐量)
每发送一批消息之后,将调用channel.waitForConfirms方法等待服务器的返回,期间如果某一条发送失败(Basic.nack或超时情况),则将重试重新发送所有消息。基本步骤为:
1.开启channel的生产者确认
2.等待所有消息的确认(即调用channel.waitForConfirms)
3.当所有消息确认到达,开始进行下一批消息的发送
4.如果有消息拒绝或超时情况,则可以根据情况重发所有消息或相关的消息
发送并立即等待confirm
该模式就相当于批量confirm的时候,消息的数量为1个。听起来就影响吞吐,所以不建议使用。
事务机制
运转流程(AMQP 0-9-1协议):
1)生产者建立连接、开启信道
2)生产者声明交换器,并设置交换机类型、是否持久化等属性
3)生产者声明队列,并设置比如是否排它、是否持久化、是否自动删除等属性
4)生产者通过路由键将交换器和队列进行绑定
5)生产者发送消息到RabbitMQ Broker,其中包含路由键、交换器等信息
5)如果对应交换器不存在,则会发送错误,并且会关闭当前通道。
6)如果对应交换器存在,则对应的交换器根据接收到的Routing key查找相匹配的队列
7)如果找到,则将消息存入此队列
8)如果没找到,则假如生产者配置参数mandatory为false(默认)时,此消息将被丢弃或者如果该交换机存在备份交换机,则将消息路由至备份交换器中;如果mandatory为true,则此消息将被返回至生产者,此时生产者如果配置了消息返回处理器,则将由其进行处理。
9)关闭信道
10)关闭连接
2.3 消费者Consumer
每一个Consumer都会有一个Consumer Tag来唯一标识自己。
两种消费模式:
1)推模式
由Broker主动推送消息至消费端,实时性较好,不过需要流制机制来确保服务端推送过的消息不会压垮消费端
2) 拉模式
指由消费端主动向Broker请求拉去定量或定时的消息,实时性较差,但是可以根据自身处理能力控制拉取的消息量
两种应答模式
注册consumer时可以选择两种应答模式:
1)Automatic
2)Manual
当使用手动应答模式时,又有多种应答方式:
应答方式 | |
---|---|
basic.reject | 拒绝已经投递过来的消息,然后指示RabbitMQ Broker丢弃这些消息或者重新入队。不支持批量拒绝 |
basic.nack | 同样也是拒绝消息,支持basic.reject提供的所有功能同时,弥补其不足,提供批量拒绝消息的能力,当执行basic.nack时,将参数multiple设置为true,将拒绝delivery_tag所指示之前的所有已被投递的消息且未确认的消息 |
消息分发
假如一个队列同时有多个消费者进行监听消费,此时队列收到的消息将会以轮询的方式分发消息给消费者,即某队列有A,B两消费者,每次队列收到消息第一次分发给A,下次收到消息则分发给B,下下次则A,以此类推。这样会有一个什么问题呢?假如A处理的速度快,而B处理的速度慢,此时就导致了A可能发生空闲,从而导致整体应用吞吐量下降。
那么怎么解决上述问题使得资源利用最大化呢?AMQP协议指定basic.qos用来限制一个channel或一个connection最大未确认消息的数量,也就是说如果某channel或connection当前未确认的消息达到了先前通过basic.qos设置的值,那么队列将不会再投递消息给此channel,直到此消费者未确认消息数量符合条件。
channel.basicQos(int prefetchSize, int prefetchCount, boolean global);
channel.basicQos(int prefetchSize, boolean global);
channel.basicQos(int prefetchSize)
参数 | 含义 |
---|---|
prefetchSize | 限制未确认消息数量的大小,单位:B,设置为0则表示无限制 |
prefetchCount | 限制未确认消息数量的数量 ,设置为0则表示无限制 |
需要注意的是,channel可以同时消费多个队列,此时RabbitMQ需要协调各个队列确保发送的未确认消息总和不能超过预定值,这个操作会使得RabbitMQ的性能降低。所以global参数的含义在AMQP 0-9-1和RabbitMQ中进行实现的稍有不同:
global参数 | AMQP 0-9-1 | RabbitMQ |
---|---|---|
true | 整个connection所有未确认消息需要遵循prefetch的限定值 | 整个channel所有未确认消息需要遵循prefetch的限定值 |
false | 整个channel所有未确认消息需要遵循prefetch的限定值 | 对于此channel,新加入的消费者未确认消息需要遵循prefetch的限定值 |
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
上面这个例子就表示consumer1和consumer2总共只能存在最大15条未确认消息,其中单条最大10条。
消费者优先级
当队列具有多个消费者时,消费者优先级功能保证当高优先级处于active状态时(即能够接收消息),所有消息将优先分发至高优先级的消费者中,除非高优先级消费者处于block状态(可能因为basic.qos限制,也可能是因为网络阻塞等情况),会将消息派发给低优先级消费者,那么如果某个队列的所有优先级均相等,那么就是轮询的策略,轮流派发。
设置消费者优先级,可以在客户端通过设置可选参数x-priority进行指定:
Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume("my-queue", false, args, consumer);
运转流程
1)消费者建立连接、开启信道
2)消费者请求消费指定队列消息,并设置相应回调函数
3)等待RabbitMQ Broker回应并投递消息至相应队列
4)消费者确认接收到的消息
5)RabbitMQ从队列 中删除相应已经被确认的消息
6)关闭信道
7)关闭连接
2.5 交换机Exchange
Exchange概览
RabbitMQ的工作过程都是首先将消息发送至一个称为Exchange的玩意中,然后由Exchange确定将此消息路由到哪个队列中。所以可以把Exchange理解为消息中转站。RabbitMQ交换器有fanout、direct、topic、headers4种类型,不同的类型分别代表交换器在选择将消息路由派发至哪个队列种的不同的策略:
-
fanout
忽视routing-key,将消息分别分发至与自己绑定的所有队列中。在上述示例图中,MessageA、MessageB、MessageC全部会分别进入到每一个队列中。 -
direct
direct类型也会根据routing-key和binding-key进行匹配,不过需要完全匹配,就类似equlas判断。
在上述示例中,此种类型下,只有MessageA可以进入到queueC队列,其他均无法路由。 -
topic
topic类型的Exchange在路由消息时,其路由规则为根据routing-key和bingding-key进行模糊匹配路由;RabbitMQ会根据“.” 对bingding-key进行分割,分割开的每段代表一个单词,并且binding-key中可以存在通配符*和#用作模糊匹配,“*”可以匹配一个单词,“#”可以匹配0个或多个单词。
上述示例中,此种类型西,按照匹配规则,MessageA将只进入到queueB和queueC;MessageB将只进入到queueA和queueB中;MessageC将只进入到queueB中。 -
headers
不依赖路由键的匹配规则路由消息,根据发送的消息内容中的headers属性进行匹配,性能差,不实用。
备份交换机(Alternate Exchanges,AE)
当生产者发送消息到Exchange时,Exchange无法进行路由时,此时可以为该交换器设置一个备份交换器,意思就是当主Exchange无法进行消息路由时,那么将这个消息交给备份交换机来路由处理。
在指定备份交换器时,若备份交换器本身不存在,则会记录一个警告。如果备份交换器无法进行路由,那么若此备份交换器也配置了自己的备份交换器,则消息又会继续发往备份交换器的备份交换器进行处理,以此类推。
在客户端可以在声明交换器的同时,添加可选参数alternate-exchange来为当前交换器指定一个备份交换器:
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "my-ae");
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout");
channel.queueDeclare("routed");
channel.queueBind("routed", "my-direct", "key1");
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");
在上述示例中,my-direct交换器的备份交换器为my-ae,当my-direct中消息无法进行路由,则此消息会进入到my-ae,由其进行路由处理。
属性:
1)type
2)durable
3)autoDelete
4)internal
5)argument
可以设置备份交换器,备份交换器:当主交换器中消息找不到能够路由的队列时,将消息发送给备份交换器
2.6 队列 Queue
2.6.1 队列属性
先进先出的数据结构。具有以下属性:
属性 | 功能 |
---|---|
Name | Queue的Name用来做队列唯一标识以被应用使用,name最大长度为255个字节。其中以 "amq." 开头的队列名称保留给RabbitMQ Broker内部使用,如果尝试以"amp."开头进行命名我们自己的队列,则将会收到403错误码。 |
Durable | 持久性。设置为true则表示此队列需要持久化,其元数据将存储在磁盘中,并且重启会恢复;否则即表示不需要进行持久化,其元数据存储在内存中,将会在异常重启、断点等情况下消失。 |
Exclusive | 是否排他,如果此属性设置为true,则声明的队列仅对声明此队列的Connection可见,并且当此Connection断开之后,此队列也会被删除。如果其他的Connection试图使用一个不是自己声明且排他的队列时,将会收到"cannot obtain exclusive access to locked queue"RESOURCE_LOCKED异常。 |
Auto-delete | 是否自动删除,如果当一个队列的最后一个消费者被Cancel或者断开后,此队列将被删除。这里有个前提是最后一个,说明此前队列有过消费者,如果队列自始至终没有过消费者,则不会进行删除。 |
Arguments | 可选参数。用于一些插件以及像TTL、队列长度等特性。 |
2.6.3 队列长度限制
队列的长度限制可以是通过限制消息的数量(length),也可以是限制所有消息的字节大小总和(size)。可以通过policy方式或者在客户端声明队列时通过可选参数指定,如果这两种方式都存在,则最终允许的最大限制按照两者最小值来决定。需要注意的时这里的限制只统计所有ready状态的消息,并不包括消费端未确认的消息。
通过设置可选参数x-max-length或x-max-length-bytes在声明队列时实现
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);
当队列长度设置了消息最大数量或者最大容量时,如果当前达到这个限制条件,则RabbitMQ默认的行为是将队列最前端即最早的消息移除或者置为死信消息。
overflow代表当达到限制时的行为:
1)drop-head
默认行为
2)reject-publish
通过可选参数x-overflow进行设置:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-overflow", "reject-publish");
channel.queueDeclare("myqueue", false, false, false, args);
2.6.3 队列类型
1)死信队列
当消息变为死亡状态时,即死信消息后,在下述这些情形下,此消息会被重新进行投递给一个死信交换器(DLX,Dead Letter Exchange),那么与这个死信交换机绑定的队列即为死信队列。
- 消息配置TTL并已过期(如果因为队列过期而导致消息丢失不算在内)
- 消息被消费端使用basic.reject 拒绝,或者在参数requeue设置为false下使用basic.nack否定了消息
- 消息因为队列长度达到限制被丢弃
正常声明的交换机Exchange可以作为队列的死信交换器(DLX)。可以通过Policy或声明队列时指定可选参数x-dead-letter-exchange为队列配置DLX:
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
当消息需要被重新投递至队列对应的死信交换器中时,也可以通过可选参数x-dead-letter-routing-key重新指定此消息的routing-key,若不指定,则继承原有的routing-key。
2)优先级队列
RabbitMQ 从3.5.0版本开始提供支持优先级队列。所谓优先级队列,那么就是如果队列中出现排队的情况,优先消费高优先级的消息。RabbitMQ仅支持客户端声明队列时通过可选参数x-max-priority将此队列设置为优先级队列。x-max-priority的值范围为1-255,不过官方推荐设置为最大值为10.
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
chanel.queueDeclare("my-priority-queue", true, false, false, args);
声明完优先级队列之后,当发送消息的时候,就可以为此消息带上优先级的属性,默认不设置则为0.
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(8);
channel.basicPublish("my-priority-queue","routing-key",builder.build(),"message".getBytes());
需要注意的是每个队列的每个优先级级别都有内存和磁盘上的成本。而且CPU成本也很高,尤其是在消费时,因此不希望创建比较大的级别,即x-max-priority不要设置过大,官网建议1-10。
3)延迟队列
AMQP协议或RabbitMQ并未直接提供延迟队列的功能,但是可以通过消息TTL和死信队列配合达到延迟队列的目的。当一个消息在指定TTL时间过期后,会被投递到指定其所指定的死信交换器上,进而被投递至与该DXL绑定的死信队列中,所以此死信队列中的消息即为指定时间后的消息,达到延迟的目的。
4)惰性队列(Lazy Queues)
从RabbitMQ 3.6.0开始,支持惰性队列。其含义就是惰性队列将会将消息内容尽可能早的移至磁盘中,只有当消费者请求时,才会再加载到内存中。
惰性队列的一个重要的目标就是为了能够支持更长的队列,也就是能容得下更多,数百万的消息数据。而造成队列能够有这么多消息产生消息堆积的原因有多种:消费者下线/崩溃/停机维护、突然消息激增,生成者的生成消息速度大于消费者的消费速度等。
默认情况下,RabbitMQ会将到达队列中的消息存储在内存中,以便快速将消息投递给消费端,即使是持久性消息,在写往磁盘的同时,内存中也会驻留一份。当RabbitMQ需要释放内存的时候,就需要批量将内存中的数据进行换页至磁盘中,这个操作会耗费比较长的时间,且会阻塞队列的操作,使得无法接收到新的消息。
基于上述,惰性队列会将接收到的消息直接存入磁盘,不管是否持久化。这样避免了内存的消耗,但是会增加IO的使用,当然如果消息本身就是持久化的,这块IO的消耗也是不可避免的。
在客户端声明队列时,可为其添加可选参数x-queue-mode来指定是否为惰性队列,默认x-queue-mode为default:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
2.7 消息过期时间(Time To Live)
RabbitMQ允许我们给消息设置TTL,即存活时间。当消息在队列中存在的时间超出了TTL配置,则视为死亡消息(Dead Message)。RabbitMQ提供了两种方式设置消息TTL。
2.7.1 针对队列中所有message设置
当给队列设置了TTL之后,则此队列中的所有消息都会相同的TTL时间。需要注意的是当一个消息分发路由到不同的队列时,可能会有不同的死亡时间或者根本不会死亡,在一个队列中的消息的死亡不会影响到其他队列中相同消息。
RabbitMQ提供了两种方式为队列设置TTL,单位毫秒:
1)在客户端声明队列时,设置Arguments参数进行声明
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
2)通过Policy方式
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
如果将队列TTL设置为0,则当消息到达队列时即视为过期,除非消息能够立刻推送至消费者。
2.7.2 给消息单独设置TTL
TTL还可以指定给具体的消息。通过当调用basic.publish推送消息时,设置expiration属性设置TTL,单位也是毫秒。
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
注意: 如果上述两种方式一起混用,则最终消息的TTL为两者取最小。
2.8 队列过期时间(Time To Live)
RabbitMQ依然支持给队列设置过期时间。队列的过期时间表示的含义是:当队列处于未使用状态(即此队列上没有任何消费者,也没有被重新声明过,并且也没有被使用basic.get使用拉模式获取消息过)时,超出了配置的TTL时间,则视为过期将被自动删除。如果遇到重启等,并且队列是持久化的,则该队列的TTL会重新计算。
给队列设置TTL也拥有两种方式:
1)通过在客户端代码中声明队列时,指定arguments参数中的x-expires
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
2)通过policy方式设置
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues
三、RabbitMQ 管理
3.1 用户管理
创建用户:rabbitmqctl add_user {username} {password}
更改用户密码:rabbitmqctl change_password {username} {password}
清除用户密码(免密登陆):rabbitmqctl clear_password {username}
删除用户:rabbitmqctl delete_user {username}
列举用户:rabbitmqctl list_users
3.2 角色管理
角色类型:
1)none
2)management
3)policymaker
4)monitoring
5)administartor
3.3 多租户以及权限
多租户的概念就是创建使用不同的vhost,称之为虚拟主机,每个vhost可以看做是小型rabbitmq服务,不同vhost之间的队列交换器等绝对隔离,不能互相绑定。
创建vhost:rabbitmqctl add_vhost {vhost}
删除vhost:rabbitmqctl delete_vhost {vhost}
rabbitmq权限粒度是vhost,所以给用户设置权限,也是以vhost为粒度,设置某用户可以对哪些vhost进行操作
rabbitmqctl set_permissions [-p vhost] {user} {conf}{write}{read}
3.4 Web界面管理
RabbitMQ management插件提供了web管理界面
启用插件:rabbitmq-plugins enable rabbitmq_management
停用插件:rabbitmq-plugins disable rabbitmq_management
访问:http://{ip}:15672