消息队列之RabbitMQ

2019-12-08  本文已影响0人  Gukson666

AMQP(高级消息队列协议)

在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP的标准实现。

MQ使用场景

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

RabbitMQ模型

消息从"发送端"(publisher)把消息发布到"交换器"(exchange),通常比邮局或邮箱。"交换器"根据"路由关键字"(routing-key)去绑定(binding)一个队列(queue)。然后AMQP代理(broker)向消费者(consumers)传递消息订阅队列(queues),或消费者从队列获取消息。


术语介绍

1. ConnectionFactory、Connection、Channel

2. Queue

Queue(队列)是RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。


3. Message acknowledgment

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

4. ####Rejecting Messages

拒绝消息。当消费者应用程序收到消息时,该消息的处理可能会成功,也可能不会成功。 消费者可以通过拒绝消息向代理指出消息处理失败(或当时无法完成)。 当拒绝消息时,消费者可以要求代理丢弃或重新发送消息。 当队列中只有一个消费者时,确保您不会通过一次又一次地拒绝并重新发送来自同一个消费者的消息来创建无限的消息传递循环。

5. Message durability

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。

6. Prefetch count

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。


7. Exchange

在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在下面介绍。


8. Routing key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

9. Binding

RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。


10. Binding key

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。

binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

11. Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种。

binding key与routing key一样也是句点号“. ”分隔的字符串。

上一篇下一篇

猜你喜欢

热点阅读