SpringBoot 与 RabbitMQ
简介
RabbitMQ是一款开源的消息队列中间件, 使用Advanced Message Queuing Protocol (AMQP) 为协议来处理队列消息。由于当下分布式系统越来越普及,消息队列中间件也越来越被得到使用。
和RabbitMQ同类型的消息中间件也有
- ActiveMQ
- RocketMQ
- Kafka
这篇文章暂时不分析几种MQ的比较,主要是写一些SpringBoot与RabbitMQ的使用
Ubuntu 上简单安装 RabbitMQ
首先找一台Linux服务器.
这里我用的是Ubuntu 16.04(ubuntu-xenial)
添加下载信息
> echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
> curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
> apt-get update
安装RabbitMQ
> apt-get install rabbitmq-server
安装完后咋们看一下RabbitMQ 有没有跑起来
> service rabbitmq-server status
如果跑起来的话会显示RabbitMQ的基本信息,没有的话运行
> service rabbitmq-server start
接下来我们需要一个后台的控制页面来管理我们的RabbitMQ服务, RabbitMQ其实自带了一个管理后台,所以直接激活一下就行了
> rabbitmq-plugins enable rabbitmq_management
接下来我们需要创建一个可以登录的用户,刚开始可以创建一个admin管理员,并给这个用户最高权限
> rabbitmqctl add_user [username] [password]
> rabbitmqctl set_user_tags [username] administrator
> rabbitmqctl set_permissions -p "/" [username] ".*" ".*" ".*"
当用户创建完毕后访问 http://[你的服务器IP]:15672 , 然后会出现登录画面,用刚才创建的用户登录即可
RabbitMQ 的简单概念与使用
RabbitMQ消息队列模型图首先看一下面的模型,简单了来讲就是Publisher将消息发送到Exchange上,然后Exchange通过Routes的健值来发布到Queue里,然后Consumer去Queue里消费消息.
这边有三种常用的Exchange的方式
- Direct Exchange
- Topic Exchange
- Fanout Exchange
下面就对上面三种方式我们来进行一下代码的实现
Direct Exchange
Direct Exchange 是RabbitMQ 默认的交换形式,从图上可以看出, 消息发送方直接将带有Routing Key: green 这个参数发送到Exchange中,然后Exchange再将消息分配到对应的Queue中
Direct Exchange TypeSpring 代码实现如下
Producer 部分
@Component
public class DirectProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "This direct exchange message";
this.rabbitTemplate.convertAndSend("testqueue" , context);
}
}
代码中将“testqueue” 做为routing key, exchange会将消息发送到名为“testqueue”的Queue里面
Consumer部分
@Component
@RabbitListener(queues = "testqueue")
public class DirectConsumer {
@RabbitHandler
public void consume(String context) {
System.out.println("Direct Exchange Consumer : " + context);
}
}
Consumer对“testqueue” 进行监听然后消费消息。
Topic Exchange
Topic Exchange的模式跟Direct 相似,也需要传routing key,但是转发消息是通过通配符来做的。比如:
-
路由键必须是一串字符,用句号(.) 隔开,比如说 main.queue
-
路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:main.subqueue.*,那么就只能匹配路由键是这样子的:第一个单词是 main,第二个单词是 subqueue。
-
井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是main.subqueue.test.#,那么,以main.subqueue.test.one开头的路由键都是可以的。
首先我们先造3个Consumer分别监听不同的queue,但是这里面监听的queue有绑定不同的通配符规则, 其中Annotation里的key包含了通配符规则
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "topicQueueA",durable = "true"),
exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
key = "*.topic.A")
)
public class TopicConsumerA {
@RabbitHandler
public void consume(String message) {
System.out.println("Topic Consumer A : " + message);
}
}
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "topicQueueB",durable = "true"),
exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
key = "*.topic.*" )
)
public class TopicConsumerB {
@RabbitHandler
public void consume(String message) {
System.out.println("Topic Consumer B : " + message);
}
}
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "topicQueueC",durable = "true"),
exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
key = "main.topic.*")
)
public class TopicConsumerC {
@RabbitHandler
public void consume(String message) {
System.out.println("Topic Consumer C : " + message);
}
}
然后我们通过发送不同routing key来控制那些Consumer能收到消息
this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.A", context);
main.topic.A 这条routing key 应该是满足上面三个通配符规则的,所有三个Consumer 都能收到发送的消息
this.rabbitTemplate.convertAndSend("testTopicExchange", "test.topic.A", context);
test.topic.A 只能满足A和B的通配符规则,即只有ConsumerA和ConsumerB能收到消息
this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.KK", context);
main.topic.KK 只有ConsumerB和C能收到
Fanout Exchange
Fanout Exchange模式是,不管routing key 只要这个queue绑定到fanout exchange 这些queues 都会收到消息,有点像广播的形式.
Fanout Exchange type接下来我看一下Spring代码的实现部分
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "This fanout exchange message";
rabbitTemplate.convertAndSend("testFanoutExchange","", context);
}
}
Producer将消息发送到"testFanoutExchange"的exchange中,第二个参数因为是Fanout模式所以我们不需要传routing key。
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "fanoutqueueA",durable = "true"),
exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerA {
@RabbitHandler
public void consume(String message) {
System.out.println("Fanout Exchange Consumer A : " + message);
}
}
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "fanoutqueueB",durable = "true"),
exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerB {
@RabbitHandler
public void consume(String message) {
System.out.println("Fanout Exchange Consumer B : " + message);
}
}
这边consumer中只要将监听的queue绑定到接收消息的"testFanoutExchange" 即可.
这样在fanout这种模式下只要是绑定到指定的exchange上所有的queues都能收到消息
演示代码
以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-rabbitmq 目录下载并运行