rabbitMQ

2020-03-13  本文已影响0人  呆弱鸡

AMQP 0.9.1介绍

AMQP 是什么

AMQP(高级消息队列协议)是一个网络协议,他支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

消息代理和他们所扮演的角色

消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把收到的消息发送给处理消息的消费者(consumers)。

由于AMQP是一个网络协议,所以这个过程中的发布者、消费者、消息代理可以存在于不同的设备上。

AMQP 模型简介

消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

发布者(publisher)发布消息时可以给消息指定各种消息属性(message meta-data)。有些属性有可能会被消息代理(brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。

在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

队列,交换机和绑定统称为AMQP实体(AMQP entities)。

AMQP 是一个可编程的协议

AMQP 0-9-1是一个可编程协议,某种意义上说AMQP的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。

这虽然能让开发人员自由发挥,但也需要他们注意潜在的定义冲突。当然这在实践中很少会发生,如果发生,会以配置错误(misconfiguration)的形式表现出来。

应用程序(Applications)声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。

交换机和交换机类型

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机

name(交换机类型) default pre-declared names(预声明的默认名称)
direct exchange直连交换机 (Empty string)and amq.direct
fanout exchange扇形交换机 amq.fanout
topic exchange主题交换机 amq.topic
headers exchange头交换机 amq.match (and amq.headers in RabbitMq)

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:Durability (消息代理重启后,交换机是否还存在),Auto-delete(当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它),Arguments(依赖代理本身)。

交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

默认交换机

默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

举个栗子:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。因此,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。

直连交换机

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

exchange-direct.png

扇形交换机

扇型交换机(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

主题交换机

*代表一个单词

#代表0个或多个单词

主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

使用案例:

头交换机

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。

队列

AMQP中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。

队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。

队列参数

  1. x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。
  2. x-expires 队列在被自动删除(毫秒)之前可以使用多长时间。
  3. x-max-length 队列在开始从头部删除之前可以包含多少就绪消息。
  4. x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小。
  5. x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。
  6. x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。
  7. x-max-priority 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级。
  8. x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。
  9. x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。

队列名称

队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个。队列的名字可以是最多255字节的一个utf-8字符串。若希望AMQP消息代理生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,我们可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称。

以"amq."开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。

队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

绑定

绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

打个比方:

拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。

如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

消费者

消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在AMQP 0-9-1 模型中,有两种途径可以达到此目的:

使用push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。

消息确认

消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:

前者被称作自动确认模式(automatic acknowledgement model),后者被称作显式确认模式(explicit acknowledgement model)。在显式模式下,由消费者应用来选择什么时候发送确认回执(acknowledgement)。应用可以在收到消息后立即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执(例如,成功获取一个网页内容并将其存储之后)。

如果一个消费者在尚未发送确认回执的情况下挂掉了,那AMQP代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。

拒绝消息

当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

否定确认

在AMQP中,basic.reject方法用来执行拒绝消息的操作。但basic.reject有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是RabbitMQ,那么你可以使用被称作negative acknowledgements(也叫nacks)

预取消息

在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。

注意,RabbitMQ只支持通道级的预取计数,而不是连接级的或者基于大小的预取。

配置策略policies

导航对管理员>策略>添加/更新操作员策略。
在名称“ ^ amq \”旁边输入“ transient-queue-ttl”。模式旁边的,然后选择应用到旁边的“队列”。
在策略旁边的第一行中输入“ expires” = 1800000。
单击添加策略。

消息属性和有效载荷(消息主题)

AMQP模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:

有些属性是被AMQP代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟HTTP协议的X-Headers很相似。消息属性需要在消息被发布的时候定义。

AMQP的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被AMQP代理当作不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似JSON这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们通常使用"content-type" 和 "content-encoding" 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。

消息能够以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。

消息确认

由于网络的不确定性和应用失败的可能性,处理确认回执(acknowledgement)就变的十分重要。有时我们确认消费者收到消息就可以了,有时确认回执意味着消息已被验证并且处理完毕,例如对某些数据已经验证完毕并且进行了数据存储或者索引操作。

这种情形很常见,所以 AMQP 0-9-1 内置了一个功能叫做 消息确认(message acknowledgements),消费者用它来确认消息已经被接收或者处理。如果一个应用崩溃掉(此时连接会断掉,所以AMQP代理亦会得知),而且消息的确认回执功能已经被开启,但是消息代理尚未获得确认回执,那么消息会被从新放入队列(并且在还有还有其他消费者存在于此队列的前提下,立即投递给另外一个消费者)。

协议内置的消息确认功能将帮助开发者建立强大的软件。

AMQP 0.9.1方法

让我们来看看交换机类,有一组方法被关联到了交换机的操作上。这些方法如下所示:

(请注意,RabbitMQ网站参考中包含了特用于RabbitMQ的交换机类的扩展,这里我们不对其进行讨论)

以上的操作来自逻辑上的配对:exchange.declare 和 exchange.declare-ok,exchange.delete 和 exchange.delete-ok. 这些操作分为“请求 - requests”(由客户端发送)和“响应 - responses”(由代理发送,用来回应之前提到的“请求”操作)。

如下的例子:客户端要求消息代理使用exchange.declare方法声明一个新的交换机:

exchange-declare.png

如上图所示,exchange.declare方法携带了好几个参数。这些参数可以允许客户端指定交换机名称、类型、是否持久化等等。

操作成功后,消息代理使用exchange.declare-ok方法进行回应:

exchange-declare-ok.png

exchange.declare-ok方法除了通道号之外没有携带任何其他参数(通道-channel 会在本指南稍后章节进行介绍)。

AMQP队列类的配对方法 - queue.declare方法 和 queue.declare-ok有着与其他配对方法非常相似的一系列事件:

queue-declare.png queue-declare-ok.png

不是所有的AMQP方法都有与其配对的“另一半”。许多(basic.publish是最被广泛使用的)都没有相对应的“响应”方法,另外一些(如basic.get)有着一种以上与之对应的“响应”方法。

连接

AMQP连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供TLS(SSL)保护。当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。

通道

有些应用需要与AMQP代理建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1提供了通道(channels)来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。

在涉及多线程/进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程/进程共享。

一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

虚拟主机

为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。

AMQP是可扩展的

AMQP 0-9-1 拥有多个扩展点:

这些特性使得AMQP 0-9-1模型更加灵活,并且能够适用于解决更加宽泛的问题。

AMQP 0.9.1 客户端生态系统

AMQP 0-9-1 拥有众多的适用于各种流行语言和框架的客户端。其中一部分严格遵循AMQP规范,提供AMQP方法的实现。另一部分提供了额外的技术,方便使用的方法和抽象。有些客户端是异步的(非阻塞的),有些是同步的(阻塞的),有些将这两者同时实现。有些客户端支持“供应商的特定扩展”(例如RabbitMQ的特定扩展)。

因为AMQP的主要目标之一就是实现交互性,所以对于开发者来讲,了解协议的操作方法而不是只停留在弄懂特定客户端的库就显得十分重要。这样一来,开发者使用不同类型的库与协议进行沟通时就会容易的多。

下载安装

版本

3.7.14

查看操作系统信息

cat /etc/centos-release

CentOS release 6.8 (Final)

安装rabbit-server

rabbitmq提供的erlang,对第三方插件支持可能会有问题

rpm -ivh [--force] erlang-21.3.4-1.el6.x86_64.rpm

rabbitmq

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum -y install rabbitmq-server

配置文件

/etc/rabbitmq/ (不同安装方式位置会有不同)https://www.rabbitmq.com/configure.html#config-location

端口配置 https://www.rabbitmq.com/install-rpm.html#ports

注意:服务器设置为以系统用户rabbitmq身份运行 。

启动服务器

守护程序

chkconfig rabbitmq-server on

启动停止

/sbin/service rabbitmq-server start
/sbin/service rabbitmq-server stop

图形化管理插件(端口:15672)

/usr/lib/rabbitmq/bin/rabbitmq-plugins list //查看插件安装情况
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management //启用rabbitmq_management服务

rabbitmqctl常用操作

https://www.rabbitmq.com/rabbitmqctl.8.html

查询虚拟机rabbitmqctl list_vhosts

添加虚拟机rabbitmqctl add_vhost host_name

删除虚拟机rabbitmqctl delete_vhost host_name

查询用户rabbitmqctl list_users

添加用户rabbitmqctl add_user username pwd

删除用户rabbitmqctl delete_user username

修改密码rabbitmqctl change_password username newpassword

设置用户标签 rabbitmqctl set_user_tags username tag 设置这个才能在页面上登录,tag可以为administrator, monitoring, management

设置权限rabbitmqctl set_permissions [-p <vhost>] <username> <conf> <write> <read>权限配置包括:配置(队列和交换机的创建和删除)、写(发布消息)、读(有关消息的任何操作,包括清除这个队列) 。例如:rabbitmqctl set_permissions -p host_1 linyy ".*" ".*" ".*"
conf:一个正则表达式match哪些配置资源能够被该用户访问。
write:一个正则表达式match哪些配置资源能够被该用户读。
read:一个正则表达式match哪些配置资源能够被该用户访问

查询queuerabbitmqctl list_queues [-p vhost]

查询exchangerabbitmqctl list_exchanges

查询bindingrabbitmqctl list_bindings

linux系统限制

用户打开文件最大数量(生产环境建议:65536)

ulimit -n

os内核允许最大打开文件数量

cat /proc/sys/fs/file-max

日志

/var/log/rabbitmq

集群

将多台计算机连接在一起以形成单个逻辑代理,所有节点必须具有相同的Erlang cookie,计算机之间的网络链接必须可靠,必须运行相同版本的RabbitMQ和Erlang。

虚拟主机,交换,用户和权限将自动镜像到群集中的所有节点。队列可以位于单个节点上,也可以跨多个节点进行镜像。连接到群集中任何节点的客户端可以查看群集中的所有队列,即使它们不在该节点上也是如此。

节点(rabbit@主机名)

所有节点都是对等的,只有queue有主子节点之分。

打破集群 rabbitmqctl reset

默认情况下,如果队列的主节点出现故障,与其对等节点断开连接或从群集中删除,则最旧的镜像将被提升为新的主节点。在某些情况下,此镜像可能 不同步,这将导致数据丢失。从RabbitMQ 3.7.5开始,ha-promote-on-failure 策略密钥控制是否允许不同步的镜像升级。设置为 when-synced时,将确保不提升未同步的镜像。使用when-synced促销策略的系统必须使用 发布者确认,以便检测队列不可用性和代理无法排队消息。

策略policy

一般3节点2个计数,5节点3个计数

ha-mode ha-params result
exactly count 1意味着只有一个主队列。2意味着1个主队列+1个镜像队列。如果集群中的节点数少于计数,则队列镜像到所有节点。如果急群众节点数多余计数,并且包含镜像的节点关闭,则将在另一个节点上创建镜像,使用具有"ha-promote-on-shutdown"的exactly模式,"always"可能是危险的,因为队列可以在集群中迁移并在关闭时变得不同步。(3.7.5版本后,可以用ha-promote-on-failure的when-synced)
all (none) 队列在群集中的所有节点上进行镜像。将新节点添加到群集后,该队列将镜像到该节点。这个设置非常保守。镜像群集节点的法定数量(N / 2 + 1) ,而不是推荐。镜像到所有节点将给所有群集节点带来额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用。
nodes node names 队列镜像到节点名称中列出的节点。节点名称是出现在rabbitmqctl cluster_status中的Erlang节点名称; 它们通常具有“ rabbit @ hostname ” 的形式。如果这些节点名称中的任何一个不是群集的一部分,则这不构成错误。如果在声明队列时列表中没有任何节点处于联机状态,则将在声明客户端所连接的节点上创建队列。
发布者确认

要启用确认,客户端将发送 confirm.select方法。根据是否 设置了无等待,代理可以使用confirm.select-ok进行响应。一旦在 频道上使用confirm.select方法,就会说它处于确认模式。事务渠道不能进入确认模式,一旦渠道处于确认模式,就不能进行事务。

什么时候发布的消息会被broker确认

对于可路由消息,当所有队列都接受消息时,将发送basic.ack。对于路由到持久队列的持久性消息,这意味着持久化到磁盘。对于镜像队列,这意味着所有镜像都已接受该消息。

永久消息的ack延迟

在将消息持久保存到磁盘后,将发送路由到持久队列的持久消息的basic.ack。RabbitMQ消息存储在一段时间(几百毫秒)之后批量传递消息到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。这意味着在恒定负载下,basic.ack的延迟 可以达到几百毫秒。为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息并等待未完成的确认。确切的API因客户端库而异。

发布者返回(退回模式:未投递到queue):
对于返回的消息,mandatory必须将模板的属性设置为true或者mandatory-expression 必须将其评估true为特定消息。
此功能需要将CachingConnectionFactory其publisherReturns属性设置为true。
RabbitTemplate.setReturnCallback通过调用进行注册,将返回值发送给客户端setReturnCallback(ReturnCallback callback)。
回调必须实现以下方法void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey);
ReturnCallback每个仅支持一个RabbitTemplate

发布者确认(确认模式:未投递到exchange):
模板需要CachingConnectionFactory其publisherConfirms属性设置为的true。
RabbitTemplate.ConfirmCallback通过调用确认,发送确认给客户端setConfirmCallback(ConfirmCallback callback)。
回调必须实现此方法void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData是发送原始消息时由客户提供的对象。对的ack错误nack。例如,如果在生成时可用nack,则原因可能包含的原因:一个示例是将消息发送到不存在的交换机时。在这种情况下,代理将关闭渠道。封闭的原因包括在中。
一个ConfirmCallback仅支持一个RabbitTemplate。

集群搭建

搭建各主机上自己的rabbitmq

修改主机名

修改/etc/hosts

拷贝一台/var/lib/rabbitmq/.erlang.cookie到其他,保证文件内容一致

在RabbitMQ集群集群中,必须至少有一个磁盘节点,否则队列元数据无法写入到集群中,当磁盘节点宕掉时,集群将无法写入新的队列元数据信息。(disc,ram)

在集群搭建好之后,需要配置镜像策略,才能同步数据。

单台多实例(适用于开发环境)
开启
RABBITMQ_NODE_PORT=5772 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server -detached
RABBITMQ_NODE_PORT=5773 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
检查端口
netstat -tnlp | grep 5772
(cli端口号还开着的话:rabbitmqctl -n rabbit2 stop)
加入集群
rabbitmqctl -n rabbit2 stop_app
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 join_cluster rabbit1@hostname -s --ram
rabbitmqctl -n rabbit2 start_app
查看集群cluster信息
rabbitmqctl -n rabbit1 cluster_status

注意事项

默认值

默认创建guest/guest,只有localhost能用,生产环境建议删除guest用户增加安全性。

通道并发

应用程序应该每个线程使用一个channel,而不是跨线程共享相同的channel。

使用通道池,可以将通道池视为特定的同步解决方案,建议使用现有的池库而不是自行开发的解决方案。例如,Spring AMQP 具有即用型通道池功能。

消息确认

https://www.rabbitmq.com/confirms.html

确认模式和事务不能同时使用。

ack的机制会触发消息重复消费的( 一般出在任务超时,或者没有及时返回状态,引起任务重新入队列,重新消费。在rabbtimq里连接的断开也会触发消息重新入队列)。消费任务类型最好要支持幂等性,这样的好处是 任务执行多少次都没关系,顶多消耗一些性能! 如果不支持幂等,比如发送信息? 那么需要构建一个map来记录任务的执行情况! 不仅仅是成功和失败,还要有心跳!!! 这个map在消费端实现就可以了!!! 这里会出现一个问题,有两个消费者 c1, c2 ,一个任务有可能被c1消费,如果再来一次,被c2执行? 那么如何得知任务的情况? 任务派发! 任务做成hash,固定消费者!坚决不要想方设法在mq扩展这个future。一句话,要不保证消息幂等性,要不就用map记录任务状态.

顺序

单个queue在多消费者下不能保证其先后顺序。我们遇到的大多数场景都不需要消息的有序的,如果对于消息顺序敏感,那么我们这里给出的方法是 消息体通过hash分派到队列里,每个队列对应一个消费者,多分拆队列。为什么要这么设计? 同一组的任务会被分配到同一个队列里,每个队列只能有一个worker来消费,这样避免了同一个队列多个消费者消费时,乱序的可能! t1, t2 两个任务, t1 虽然被c1先pop了,但是有可能c2先把 t2 任务给完成了。一句话,主动去分配队列,单个消费者。

幂等性

一个任务消费者消费多次,结果是一致的。

消息重复消费原因:
1、生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息
2、消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

解决方案:1.唯一 ID + 指纹码 机制;2.利用 redis 的原子性去实现(mq会生成一个唯一id,或者自己实现唯一id。第1个方案需要根据数据库的主键不重复来实现,数据库要做分库分表。)

延迟队列

实现方法:死信交换机+消息存活时间TTL,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了

作用:定时任务(延迟一定时间执行)

进入死信交换机条件:1.消息被consumer拒收,并且reject方法参数里requeue是false。2.消息过期,即TTL时间到了。3.队列的长度限制满了。

整体设置:x-message-ttl参数,如果整个队列的消息都是相同的,可以设置

单独设置:messageProperties.setExpiration("6000");

x-dead-letter-exchange代表消息过期后,消息要进入的交换机

x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key

spring-amqp

版本

2.1.5.RELEASE

兼容性

Spring Framework的最小版本依赖性是5.1.x.

最小amqp-clientJava客户端库版本为5.4.0。

代码

ParameterizedTypeReference 用于复杂的参数转换

重试功能,可以配置RabbitTemplate使用RetryTemplate来帮助处理代理连接问题

RabbitMQ介绍

开源,支持协议多,重量级,路由、负载均衡、数据持久化支持好

作用

是一个消息系统,允许软件、应用相互连接和扩展,从而组成一个更大的应用,或者将用户设备和数据进行连接,消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶。

进行数据投递,非阻塞操作或推送通知。实现发布/订阅,或者工作队列。

思想

避免立即执行资源密集型任务并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。

技术亮点

可靠性

持久性机制,投递确认,发布者证实,高可用性机制

灵活的路由

提供多种内置交换机,可以组合起来使用,甚至可以实现自己的交换机类型

集群

多个rabbitMq聚合成一个独立的逻辑代理

联合

对于服务器来说,它比集群需要更多的松散和非可靠链接。为此RabbitMQ提供了联合模型。

高可用的队列

在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。

多协议

支持多种消息协议的消息传递,AMQP 0-9-1,0-9和0-8,以及扩展,STOMP,MQTT,AMQP 1.0,HTTP

广泛的客户端

只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。

可视化管理

RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。

追踪

如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。

插件系统

RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。

商业支持、大型社区,等等

参考

https://www.rabbitmq.com/

http://rabbitmq.mr-ping.com/

https://spring.io/projects/spring-amqp#learn

上一篇下一篇

猜你喜欢

热点阅读