RabbitMQ——基础知识与模拟体验
1、什么是RabbitMQ?
前面讲过一篇文件,关于ActiveMQ的,可以对照看一下,链接:https://www.jianshu.com/p/187d5c2a898d
那么,RabbitMQ又是什么呢?
RabbitMQ 是一个消息代理:它接收并转发消息。你可以将其视为邮局:当你将要发布的邮件放在邮箱中时,您可以确信 Postman 先生最终会将邮件发送给收件人。在这个比喻中,RabbitMQ 是一个邮箱,邮局和邮递员。
RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块的消息。
常见术语
1)生产者:一个发送消息的程序是一个生产者。
2)队列:队列类似于邮箱。虽然消息通过 RabbitMQ 在你的应用中传递,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制的限制,它本质上是一个大的消息缓冲区。不同的生产者可以通过同一个队列发送消息,此外,不同的消费者也可以从同一个队列上接收消息。
3)消费者:一个等待接收消息的程序是一个消费者。
rabbitmq.png
上图所示:“P”是我们的生产者,“C”是我们的消费者。中间的框是队列 - RabbitMQ 代表消费者的消息缓冲区。
整个过程非常简单,生产者创建消息,消费者接收这些消息。你的应用程序既可以作为生产者向其他应用程序发送消息,也可以作为消费者,等待接收其他应用程序的消息。其中,存储消息的是消息队列,它类似于邮箱,消息通过消息队列进行投递。
2、下载与安装(windows环境)
参考链接:https://blog.csdn.net/weixin_39735923/article/details/79288578
3、初步体验
引入依赖:
<!-- 引入RabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.3</version>
</dependency>
创建生产者,生产者连接到 RabbitMQ,发送一条数据:
package com.guxf.demo.rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World!";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
// 参数1 exchange :交换器
// 参数2 routingKey : 路由键
// 参数3 props : 消息的其他参数
// 参数4 body : 消息体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者发送的消息是[" + message + "]");
// 关闭频道和连接
channel.close();
connection.close();
}
}
消息接收者:
package com.guxf.demo.rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Receive {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("我在等待接收消息——");
// 创建队列消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者收到的消息是[" + message + "]");
}
};
// basicConsume(String queue, boolean autoAck, Consumer callback)
// 参数1 queue :队列名
// 参数2 autoAck : 是否自动ACK
// 参数3 callback : 消费者对象的一个接口,用来配置回调
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
先运行Send,发出消息,再运行Receive接收消息,即可收到HelloWord,而且可以看到队列名(前提是得启动RabbitMQ):
看到了hello队列.png
4、任务队列
1)任务队列:饭店高峰期时,顾客单子不得不按照下单顺序一个个放在厨房,进行先后炒菜处理,这一堆的单子就是任务队列。
2)消息队列:消息队列(MQ)可以理解成两个应用程序间(生产者消费者间)的通信,例如短信发送模块,因为模块的发送速度跟不上,这时候需要有一个容器,暂存一下,缓解下压力,那么“消息队列”就是在消息的传输过程中保存消息的“容器”。然后短信模块就可以淡定的去消息队列取出要发出的短信内容,进行发送处理。
创建多消息的发送端:
package com.guxf.demo.rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class NewTask {
private final static String QUEUE_NAME = "ningmo";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
for (int i = 0; i < 10; i++) {
String message = "Task:" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送的消息为[" + message + "]");
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
接收消息:
package com.guxf.demo.rabbit;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Worker {
private final static String QUEUE_NAME = "ningmo";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到的消息为[" + message + "]");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// acknowledgment is covered below
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) throws InterruptedException {
String[] taskArr = task.split(":");
TimeUnit.SECONDS.sleep(Long.valueOf(taskArr[1]));
}
}