消息队列之RabbitMQ
AMQP(高级消息队列协议)
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP的标准实现。
MQ使用场景
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
RabbitMQ模型
消息从"发送端"(publisher)把消息发布到"交换器"(exchange),通常比邮局或邮箱。"交换器"根据"路由关键字"(routing-key)去绑定(binding)一个队列(queue)。然后AMQP代理(broker)向消费者(consumers)传递消息订阅队列(queues),或消费者从队列获取消息。
- message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、- priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- producter(Publisher):生产者,一般指生产消息的一端。
- consumer:消费者,一般指消费消息的一端。
- broker:消息队列服务器的实体,也可以理解为代理,不管消费者还是生产者,都需要连接到broker,才能进行生产消费。
术语介绍
1. ConnectionFactory、Connection、Channel
- Connection:RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
- ConnectionFactory:Connection的制造工厂。
- Channel:与RabbitMQ打交道的最重要的一个接口,大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
2. Queue
Queue(队列)是RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
3. Message acknowledgment
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
4. ####Rejecting Messages
拒绝消息。当消费者应用程序收到消息时,该消息的处理可能会成功,也可能不会成功。 消费者可以通过拒绝消息向代理指出消息处理失败(或当时无法完成)。 当拒绝消息时,消费者可以要求代理丢弃或重新发送消息。 当队列中只有一个消费者时,确保您不会通过一次又一次地拒绝并重新发送来自同一个消费者的消息来创建无限的消息传递循环。
5. Message durability
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
6. Prefetch count
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
7. Exchange
在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在下面介绍。
8. Routing key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。
9. Binding
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
10. Binding key
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
11. Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种。
-
fanout:fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。
-
direct:direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
以上图为例,routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
-
topic
topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
binding key与routing key一样也是句点号“. ”分隔的字符串。
- header
header exchang(自定义交换器),根据自定义的header attribute去匹配不同的queue。