Spring Boot——整合RabbitMQ
2018-10-10 本文已影响7人
莫问以
1、什么是RabbitMQ?
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。常用的消息中间件有ActiveMQ和RabbitMQ,这里先说RabbitMQ。关于RabbitMQ的基本知识,可以参考:http://blog.720ui.com/2017/rabbitmq_action_01_helloworld/
消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC的调用等等。参考:http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html
Windows环境下的安装RabbitMQ可以参考如下:https://www.cnblogs.com/junrong624/p/4121656.html
2、Spring Boot整合RabbitMQ
A、引入RabbitMQ依赖:
<!-- 增加 RabbitMQ 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
B、配置信息:
#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
C、注册队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "spring-boot_queue";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}
}
D、生产者
package com.guxf.demo.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.guxf.demo.domain.RabbitMQConfig;
@Service
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
System.out.println("我是生產者——我要发送消息啦...");
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, "HelloWorld!");
}
}
E、接收者
package com.guxf.demo.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class Receiver {
@Autowired
private AmqpTemplate rabbitTemplate;
@RabbitListener(queues = "spring-boot_queue")
public void receiveMessage(String message) {
System.out.println("我是接收者,我接到的消息是——" + message );
}
}
启动Application即可看到成果,下面来看下 RabbitMQ 路由场景:
只需要改队列, 在RabbitMQConfig 中注册转发器,监听等,代码如下:
package com.guxf.demo.domain;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.guxf.demo.service.Receiver;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "spring-boot";
public static final String QUEUE_EXCHANGE_NAME = "spring-boot-exchange";
@Bean
public Queue queue() {
// 是否持久化
boolean durable = true;
// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = false;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = false;
return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);
}
@Bean
public TopicExchange exchange() {
// 是否持久化
boolean durable = true;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = false;
return new TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}