RabbitMQ(二)实战
准备工作
RabbitMQ是一个消息传递者,你可以将他看作是一个 邮局,你只需将信息放进"邮筒" 他会帮助你完成信息的传达
安装
linux下载页面--- 需要解压
ubuntu到这--- 自动安装
配置
安装完成之后,如果安装方式是解压,请配置环境变量
RABBITMQ_HOME = #解压目录#
然后是新建配置文件: 在/etc/rabbitmq/
下新建以下两个文件
rabbitmq-env.conf
环境信息配置
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
RABBITMQ_NODE_PORT=5672
RABBITMQ_NODENAME=node01
rabbitmq.config
核心配置文件(不要把 . 漏掉了)
[{rabbit, [{loopback_users, []}]}].
配置完成之后可以使用以下命令开启RabbitMQ (需要root)
rabbitmq-server
如果安装方式是自动安装,那么则需要进行重启,因为安装完成后自动启动了(我是使用暴力方式重启的)
输入命令 sudo lsof -i :25672
来查找当前运行的 rabbitmq 对应的pid
然后使用sudo kill-9 #pid#
来关闭
然后就可以使用 rabbitmq-server
来开启服务了
启动
第一次启动 可以看到completed with 0 plugins
这样的信息
RabbitMQ提供了UI管理页面(需要重启)我们可以通过rabbitmq-plugins enable rabbitmq_management
命令来开启
重启后日志应该如下
上面可以看到 completed with 6 plugins
说明UI页面已经开启,那么可以访问http://localhost:15672
来访问 rabbitmq的UI页面
默认使用
用户名:guset
密码:guset
进行登录
创建用户
这里使用命令创建:
rabbitmqctl add_user test test
rabbitmqctl set_user_tags test administrator
tag分为四种"management", "policymaker", "monitoring" "administrator"
详见
http://www.rabbitmq.com/management.html
使用 test -- test重新登录
准备工作到此就结束了,下面开始讲解案例,因为后面会整合Spring Boot所以案例都运行在Spring Boot 包环境下
Java RabbitMQ Demo
官方给各种语言的多种应用提供了样例
本文很多案例中的概念在RabbitMQ(一)都提到过,这里可能会重复讲解,复习一下
案例一:"Hello World"
所有的开始都是 "Hello World"
在这个样例中,我们会使用两个java程序 分别完成
生产者:完成一条消息的发送
消费者:接收信息并打印
P代表生产者,C代表消费者,中间的框是一个队列(queue)——RabbitMQ为消费者保留的消息缓冲区。
RabbitMQ支持多种协议。本教程使用AMQP 0-9-1
,这是一种开放的、通用的消息传递协议。
pom.xml中添加下面代码来引入rabbitMQ的包 (版本号不限制)
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
还需要引入SL4J包,RabbitMQ对此包有依赖。。。 因为整个项目是基于 Spring Boot包环境下开发,已经引入了SL4J 所以不需要再单独引入
如果是单独运行的请自行添加
首先编写一个发送类
我们创建一个 Send 类来完成发送操作:
定义(常量)队列名
main方法中:
这里我们将会连接到本地的代理上,因此是 localhost , 如果我们想要连接到其他机器上, 可以使用对应的 IP地址
然后就是创建一个channel,我们大多数操作都会通过channel提供的API来完成
接着,我们声明一个 队列(queue)以发送消息:
queueDeclare()
声明队列是幂等的,当且仅当其不存在的时候才会创建。不同队列间根据队列名区分
消息是通过二进制数组的方式传递的(getBytes()
)
最后关闭资源:
接收类(Receiving)
这是用来发布消息的,我们的消费者(consumer)从RabbitMQ推送消息,不像一般的发布器只发布一条消息,这将一直保持运行以监听和打印多条消息。
建立连接、通道的过程跟上面的一致,更多的是需要建立一个consumer
对象 对接收到的消息进行处理
运行"消费者"(Recv)
开启监听类此时再运行(Send)
接收到信息
同时,当你没有开启接收类的时候,发送类发送的消息不会丢失,而是缓存在队列(queue)里面。当你开启接受类的同时,会自动收到来自队列里面的消息。消息被接收后就会通知RabbitMQ将队列里面的缓存清理,不管消息有没有被处理完
Here's the whole Recv.java class.
案例二:Work queues
接下来是part2
在这一项中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。
Work Queues 的主要思想是,避免立即执行资源密集的任务而不得不等待其执行完成。我们将任务封装为消息并将其发送到队列中,在后台运行的一个工作进程将会弹出任务并最终执行该任务。当你管理许多工作节点时,任务就会在他们之间共享。
这个概念在web应用程序中尤其有用,因为在一个短HTTP请求窗口中不可能处理复杂的任务。
下面我们发送一个特殊的String , 用 Thread.sleep() 辅助,来模拟一些耗时的工作。
用 点 来简单表示一个任务的复杂度,例如 Hello..
表示此任务需要两秒进行处理
在Send.java的基础上修改来构造一个新的类 NewTask.java:
NewTask.java 辅助方法在Recv.java的基础上修改,来构造一个新的类 Worker.java:
Worker.java首先执行一个简单的样例:
设置输入参数为:
然后分别运行NewTask.java 和 Worker.java
run NewTask run Worker(因为当前是单应用运行)在Worker.java中,需要等待一定的时间才能执行完成
开启多个Worker应用
即对Worker.java运行多次
Worker1 Worker2默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环(平均分配)。
消息答复
完成一项任务可能需要一定的时间。你可能会想,如果一个消费者开始一项长时间的任务,并且只完成了一部分,那么会发生什么。在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它立即将其标记为删除。在这种情况下,如果您强制关闭了一个工作节点,我们将丢失它正在处理的消息。我们还将丢失发送给这个特定工作者的所有消息,但是还没有处理。
通常我们不希望因为一个节点挂掉而丢失任何消息,而希望能将这些消息传递给其他存活节点进行处理。
确保消息能够不丢失,RabbitMQ支持 message acknowledgments
( 消息答复),一条特定的消息被接收后,返回一个ack告诉RabbitMQ可以随意地进行删除
如果一个消费者挂了(通道关闭,TCP连接关闭)就不能发送ack给RabbitMQ,此时RabbitMQ就会意识到,某条消息没有被处理完成,那么就会将其重新发送到其他的消费者。这种处理流程保证了信息不会丢失,即便偶尔有消费者挂掉。
没有任何消息超时,RabbitMQ将在某个消费者挂掉后重新传递消息。即使处理消息需要很长时间。
缺省情况下,手动消息确认将被打开。在前面的例子中,我们通过autoAck=true
标记显式地关闭了它们。此时将这个标志设置为false,并在完成任务后向员工发送适当的确认信息。
修改 Worker.java
System.out.println(" [x] Done");//下面添加
channel.basicAck(envelope.getDeliveryTag(), false);
boolean autoAck = true; // 改为
boolean autoAck = false;
重新开始运行任务:
fourth....
执行时, 强制关闭Worker
fourth....
任务会自动传递到其他的Worker(或者重新打开Worker时重新执行)
使用修改后的代码,即便任何时刻关闭节点都不会出现丢失消息的情况(当然,这不能避免 整个RabbitMQ重启的情况下数据的丢失)
注意,一旦开启手动确认消息答复,就不能忘记发送回执(ack),否则会导致信息堆积(queue中的消息一直不会被删除)
为了调试这种错误,您可以使用rabbitmqctl
来打印messagesun_unacknowledged
文件:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学会了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将会忘记队列和消息,除非您告诉它不要这样做。需要有两件事来确保消息不会丢失:我们需要将队列和消息标记为持久的。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了实现这一目的,我们需要将其声明为持久的:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
尽管这个命令本身是正确的,但它在我们当前的设置中是无效的。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义具有不同参数的现有队列,并将返回任何试图执行此操作的程序的错误。但是有一个快速的解决方法——让我们声明一个有不同名称的队列,例如taskqueue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
这个队列名更改需要重新应用到“生产者”和“消费者”中
此时,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在,我们需要通过将MessageProperties
(implements BasicProperties
)设置为PERSISTENT_TEXT_PLAIN
来标记开启消息持久化
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是当RabbitMQ接受消息并没有保存它时,仍然有一个很短的时间窗口。另外,RabbitMQ不会为每条消息执行fsync(2)——它可能只是保存到缓存中,而不是真正写到磁盘上。持久性保证并不强大,但对于我们的简单任务队列来说,这已经足够了。
公平分配
RabbitMQ在消息进入队列时仅发送一条消息。它不考虑消费者的未确认消息的数量。它只是盲目地将每个m个消息发送给第n个消费者(平均分配)。
这样可能导致一个消费者一直忙碌(很倒霉地接收到所有费时间的任务),而另一个消费者则一直很空闲(接收到的都是轻任务)
我们可以通过(consumer中)设置 prefetchCount = 1
来避免这个问题
int prefetchCount = 1;
channel.basicQos(prefetchCount);
即将分配策略设为每次只分配一个任务,下一个任务交给首先完成第一个完成任务的消费者,如此类推
案例三:Publish/Subscribe
上一个案例中,默认是一个任务只交付给一个消费者进行处理,在本案例中,一个任务会交给多个消费者去处理(即 publish / subscribe 模式)。
为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将发出日志消息,第二个将接收并打印它们。
在我们的日志系统中,接收程序的每一个运行副本都会得到消息。这样,我们就可以运行两个接收器,一个将日志引导到磁盘;另一个在屏幕上打印日志。
Exchanges
上一个案例中,我们使用Queue完成了消息的发送和接收,现在我们来介绍一下Rabbit 的消息模型
首先再解释一下几个概念
生产者(producer):发送消息的用应用
队列(queue):消息的缓冲存储区
消费者(consumer) :接收消息的用户应用
RabbitMQ
的核心思想是,一条消息不会直接从生产者到队列,通常情况下,生产者甚至不知道消息有没有被传递到任何队列中。
生产者会首先将消息交付给交换器(exchanges)。交换器做的事情很简单--将从生产者(producer)得到的消息传递到队列中(queue)
至于怎么传递?是传给任意一个还是传递给多个或者是抛弃....根据策略不同有不同做法
这里我们需要推翻之前的一些误区
我们之前使用
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
来进行消息发布,其中第二个参数的意义其实并不是queueName
,而是routing key
这个我们后面会提到
这里先解惑,因为可能会有人想:之前不是要使用queueName
来指定希望发送到的queue
?怎么现在又说消息不会直接从生产者发送到队列?
这里是因为,上面
basicPublish
函数的第一个参数(exchangeName
)是空字符串,所以消息会先传递给默认exchange,其类型是direct
,策略是将消息传递到routing key
与binding key
完全匹配的队列,由于默认 exchange 无法设置binding key
,而是默认使用queueName
来作为binding key
进行配对,所以使用 默认exchange 的时候,routing key
相当于你想发送到的queueName
下图X为交换器(Exchanges)
RabbitMQ提供的有四种策略(在教程(一)中提到过)这里再复述一下
direct :消息路由到那些binding key与routing key完全匹配的Queue中。
topic:它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,不过有 分隔符 ' . '以及允许通配符'*' -> 匹配一个单词 ; '#'->任意多个单词
headers :不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。(键值对匹配)
fanout :将消息发送到与该exchanges绑定的所有queue
使用:
channel.exchangeDeclare("logs", "fanout");
第一个参数是exchange名称,第二个参数是exchange类型(本案例中就需要用这种类型来实现Logger)
使用命令: sudo rabbitmqctl list_exchanges
用于查看当前RabbitMQ的exchanges列表
在本教程的前几部分中,我们对exchange
一无所知,但仍然能够向队列发送消息。这是因为我们使用的是默认的交换,我们通过空字符串("")来识别。
我们之前是怎么发布消息的呢?
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是 exchange的名称,空字符会使用默认的无名 exchange
现在我们可以使用刚刚定义的exchange来发送消息
channel.basicPublish( "logs", "", null, message.getBytes())
临时队列
Temporary queues
每次连接到RabbitMQ都是一个新的队列,名字由server 随机命名;
当与消费者断开连接时,队列自动被删除
我们可以使用以下代码来生成一个 临时,独立的,自动删除的队列
String queueName = channel.queueDeclare().getQueue();
这时,queueName包含一个随机的队列名。例如,它可能看起来像q-jzty20brgko-hjjjj0wlg
绑定
Bindings
我们可以将队列(queue)与交换器(exchange)之间的关系成为binding
使用以下代码进行queue与exchange之间的绑定
channel.queueBind(queueName, "logs", "");
此时,logs
exchange就会将消息传递到我们的队列中
查看绑定(binding列表)
rabbitmqctl list_bindings
案例四:Routing
此案例中,我们会尝试更多的特性,例如,仅仅将 错误信息传递到 Log 文件中(节省空间),同时仍能将所有信息打印到控制台上
Binding的同时可以设置一个额外的参数 routingKey
。为了避免跟basic_publish
的参数混淆,这里称其为 binding key
channel.queueBind(queueName,EXCHANGE_NAME,"black")
binding key
的作用 还依赖于 exchange 的类型,比如 fanout
就会忽略 binding key
binding key
跟 routing key
完全匹配
案例五:Topics
之前提到过 Topics 的策略,这里就不讲解案例了,列几个图来自行领悟
案例六:RPC
同上