分布式相关技术Java

SpringBoot集成RabbitMQ 简单测试用例

2018-09-10  本文已影响1778人  Hiseico

1.在pom文件中添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.在SpringBoot的配置文件中配置RabbitMQ相关参数

#rabbitmq 此处�部分配置,完全配置请参考springBoot官方帮助文档
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消费者数量
spring.rabbitmq.listener.simple.concurrency=10
#最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费者消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试 队列满了发不进去时启动重试
spring.rabbitmq.template.retry.enabled=true 
#1秒钟后重试一次
spring.rabbitmq.template.retry.initial-interval=1000 
#最大重试次数 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大间隔 10秒钟
spring.rabbitmq.template.retry.max-interval=10000
#等待间隔 的倍数。如果为2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0

MQConfig.java

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Hiseico
 * @create 2018-09-10 下午10:25
 * @desc
 **/
@Configuration
public class MQConfig {
    public  static final String QUEUE = "queue";
    //配置队列
    @Bean
    public Queue queue(){
        return new Queue(QUEUE,true);//队列名称,是否要做持久化
    }
}

MQSender.java

import cn.sitcat.redis.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author Hiseico
 * @create 2018-09-10 下午10:24
 * @desc 发送者
 **/
@Service
public class MQSender {

    private static Logger log = LoggerFactory.getLogger(MQSender.class);

    /**
     * 操作的队列的工具类: 在引入依赖时,已经被注入
     */
    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(Object message) {
        String msg =beanToString(message);
        log.info("send message:" + msg);
//        直接往队列里放数据
        amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);//队列名称, 数据
    }

//将bean序列化成json字符串
    public static <T> String beanToString(T value) {
        if (value == null) {
            return null;
        }
        //如果是基础数据类型,就转换成String并写进去
        Class<?> clazz = value.getClass();
        if (clazz == int.class || clazz == Integer.class) {
            return "" + value;
        } else if (clazz == String.class) {
            return (String) value;
        } else if (clazz == Long.class || clazz == long.class) {
            return "" + value;
        } else {//如果是bean类型,就转换成json写进去
            return JSON.toJSONString(value);
        }
    }

    //将json字符串转化成对应的bean对象
    public static  <T> T stringToBean(String str, Class<T> clazz) {
        if (str == null || str.length() <= 0 || clazz == null) {
            return null;
        }

        if (clazz == int.class || clazz == Integer.class) {
            return (T) Integer.valueOf(str);
        } else if (clazz == String.class) {
            return (T) str;
        } else if (clazz == Long.class || clazz == long.class) {
            return (T) Long.valueOf(str);
        } else {//如果是bean类型,就转换成json写进去
            return JSON.toJavaObject(JSON.parseObject(str), clazz);
        }
}

MQReceiver.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @author Hiseico
 * @create 2018-09-10 下午10:25
 * @desc 消息接收者
 **/
@Service
public class MQReceiver {

    private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

    /**
     * Direct模式
     */
    @RabbitListener(queues = MQConfig.QUEUE)//监听 名为queue的队列名称
    public void receive(String message) {
        log.info("receive message:" + message);
    }
}

Controller.java

/**
 * @author Hiseico
 * @create 2018-08-02 下午9:05
 * @desc 测试
 **/
@Controller
public class testController {
    @Autowired
    MQSender mqSender;

    @RequestMapping("/mq")
    @ResponseBody
    public void mq() {
       //往队列中添加数据
        mqSender.send("hello,hiseico");
    }

最后,访问这个Controller中的mq方法,并查看使用slf4j打印到控制台的生产者数据和消费者数据。


控制台打印结果.png
上一篇下一篇

猜你喜欢

热点阅读