SpringBoot集成RabbitMQ 简单测试用例
2018-09-10 本文已影响1778人
Hiseico
1.在pom文件中添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在SpringBoot的配置文件中配置RabbitMQ相关参数
#rabbitmq 此处�部分配置,完全配置请参考springBoot官方帮助文档
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消费者数量
spring.rabbitmq.listener.simple.concurrency=10
#最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费者消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试 队列满了发不进去时启动重试
spring.rabbitmq.template.retry.enabled=true
#1秒钟后重试一次
spring.rabbitmq.template.retry.initial-interval=1000
#最大重试次数 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大间隔 10秒钟
spring.rabbitmq.template.retry.max-interval=10000
#等待间隔 的倍数。如果为2 第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0
MQConfig.java
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Hiseico
* @create 2018-09-10 下午10:25
* @desc
**/
@Configuration
public class MQConfig {
public static final String QUEUE = "queue";
//配置队列
@Bean
public Queue queue(){
return new Queue(QUEUE,true);//队列名称,是否要做持久化
}
}
MQSender.java
import cn.sitcat.redis.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author Hiseico
* @create 2018-09-10 下午10:24
* @desc 发送者
**/
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
/**
* 操作的队列的工具类: 在引入依赖时,已经被注入
*/
@Autowired
AmqpTemplate amqpTemplate;
public void send(Object message) {
String msg =beanToString(message);
log.info("send message:" + msg);
// 直接往队列里放数据
amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);//队列名称, 数据
}
//将bean序列化成json字符串
public static <T> String beanToString(T value) {
if (value == null) {
return null;
}
//如果是基础数据类型,就转换成String并写进去
Class<?> clazz = value.getClass();
if (clazz == int.class || clazz == Integer.class) {
return "" + value;
} else if (clazz == String.class) {
return (String) value;
} else if (clazz == Long.class || clazz == long.class) {
return "" + value;
} else {//如果是bean类型,就转换成json写进去
return JSON.toJSONString(value);
}
}
//将json字符串转化成对应的bean对象
public static <T> T stringToBean(String str, Class<T> clazz) {
if (str == null || str.length() <= 0 || clazz == null) {
return null;
}
if (clazz == int.class || clazz == Integer.class) {
return (T) Integer.valueOf(str);
} else if (clazz == String.class) {
return (T) str;
} else if (clazz == Long.class || clazz == long.class) {
return (T) Long.valueOf(str);
} else {//如果是bean类型,就转换成json写进去
return JSON.toJavaObject(JSON.parseObject(str), clazz);
}
}
MQReceiver.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @author Hiseico
* @create 2018-09-10 下午10:25
* @desc 消息接收者
**/
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
/**
* Direct模式
*/
@RabbitListener(queues = MQConfig.QUEUE)//监听 名为queue的队列名称
public void receive(String message) {
log.info("receive message:" + message);
}
}
Controller.java
/**
* @author Hiseico
* @create 2018-08-02 下午9:05
* @desc 测试
**/
@Controller
public class testController {
@Autowired
MQSender mqSender;
@RequestMapping("/mq")
@ResponseBody
public void mq() {
//往队列中添加数据
mqSender.send("hello,hiseico");
}
最后,访问这个Controller中的mq方法,并查看使用slf4j打印到控制台的生产者数据和消费者数据。
控制台打印结果.png