小菜鸟

SpringBoot使用RabbitMQ看这几篇就够了(配置篇)

2020-06-11  本文已影响0人  我是一颗小虎牙_

各位看官可以关注博主个人博客,了解更多信息。
作者:Surpasser
链接地址:https://surpass.org.cn

前言

上篇我们说到了消息队列RabbitMQ的模式概念,那么这里将会针对模式使用SpringBoot联合RabbitMQ做一个案例,实现消息的生产和消费。

这一篇也是这个主题的最后一篇了,建议配合着看。助于理解。

博主会将Demo工程放在Gitee上,有兴趣的可以拉下来自己试试。

Gitee地址:https://gitee.com/lemon_ant/os.git

正文

准备工作

新建SpringBoot项目

添加配置文件

server.port=8080

spring.application.name=cl
#RabbitMq所在服务器IP
spring.rabbitmq.host=127.0.0.1
#连接端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=root
#用户密码
spring.rabbitmq.password=123456
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/

添加pom文件

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

启动类

@SpringBootApplication
public class OsApplication {
    public static void main(String[] args) {
        SpringApplication.run(OsApplication.class, args);
    }

}

点对点模式

  1. 队列初始化

    //当没有这个队列的时候会自动创建
    @Configuration
    public class PointInitialization {
        @Bean
        Queue toPoint(){
            Queue queue = new Queue("point.to.point",true);
            return queue;
        }
    }
    
  2. 生产者

    @Component
    public class PointProducer {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String name){
            String sendMsg = "点对点队列:" + name + "   " + new Date();
            //指定队列
             this.rabbitTemplate.convertAndSend("point.to.point",sendMsg);
        }
    }
    
  3. 消费者

    @Component
    public class PointConsumer {
     //监听的队列名
        @RabbitListener(queues = "point.to.point")
        public void processOne(String name) {
            System.out.println("point.to.point:" + name);
        }
    
    }
    
  4. 测试类(模仿控制层)

    @RestController
    @RequestMapping("/Point")
    public class PointController {
        @Autowired
        private PointProducer sayProducer;
    
        @RequestMapping("/point/{name}")
        public String send(@PathVariable String name){
            sayProducer.send(name);
            return "发送成功";
        }
    }
    
  5. 使用postman模拟请求

image
  1. 控制台结果

[图片上传失败...(image-3e7425-1591871192134)]

work模式

  1. 队列初始化

    @Configuration
    public class WorkInitialization {
        //当没有这个队列的时候会自动创建
        @Bean
        Queue work(){
            Queue queue = new Queue("WorkingMode",true);
            return queue;
        }
    }
    
  2. 生产者

    @Component
    public class WorkProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String name){
            String sendMsg = "工作模式:" + name + "   " + new Date();
            //指定队列
            this.rabbitTemplate.convertAndSend("WorkingMode",sendMsg);
        }
    }
    
  3. 消费者

    //三个队列同时监听
    @Component
    public class WorkConsumer {
        @RabbitListener(queues = "WorkingMode")
        public void processOne(String name) {
            System.out.println("WorkingMode1:" + name);
        }
    
        @RabbitListener(queues = "WorkingMode")
        public void processTwo(String name) {
            System.out.println("WorkingMode2:" + name);
        }
    
        @RabbitListener(queues = "WorkingMode")
        public void processThree(String name) {
            System.out.println("WorkingMode3:" + name);
        }
    
    }
    
  4. 测试类(模仿控制层)

    @RestController
    @RequestMapping("/work")
    public class WorkController {
    
        @Autowired
        private WorkProducer sayProducer;
    
        @RequestMapping("/work/{name}")
        public String send(@PathVariable String name){
            sayProducer.send(name);
            return "发送成功";
        }
    }
    
  5. 使用postman模拟请求

image
  1. 控制台结果

[图片上传失败...(image-e9beeb-1591871192134)]

<font color=red>注意看时间,说明消息是轮询分发的,一个消息只由一个消费者消费。</font>

发布/订阅者模式(Publish/Subscribe)

  1. 队列初始化

    //类型为fanout
    @Configuration
    public class PublishInitialization {
    
        //当没有这个队列的时候会自动创建
        @Bean
        Queue publishOne(){
            Queue queue = new Queue("queue.publish.one",true);
            return queue;
        }
        @Bean
        Queue publishTwo(){
            Queue queue = new Queue("queue.publish.two",true);
            return queue;
        }
        @Bean
        Queue publishThree(){
            Queue queue = new Queue("queue.publish.three",true);
            return queue;
        }
    
        //创建交换器
        @Bean
        FanoutExchange pulishExchange(){
            FanoutExchange directExchange = new FanoutExchange("publishExchange");
            return directExchange;
        }
    
        //绑定队列(不用指定routing key),参数名字要和bean名字一致
        @Bean
        Binding bindingPublishOne(Queue publishOne,FanoutExchange pulishExchange){
            Binding binding = BindingBuilder.bind(publishOne).to(pulishExchange);
            return binding;
        }
        @Bean
        Binding bindingPublishTwo(Queue publishTwo,FanoutExchange pulishExchange){
            Binding binding = BindingBuilder.bind(publishTwo).to(pulishExchange);
            return binding;
        }
        @Bean
        Binding bindingPublishThree(Queue publishThree,FanoutExchange pulishExchange){
            Binding binding = BindingBuilder.bind(publishThree).to(pulishExchange);
            return binding;
        }
    }
    
  2. 生产者

    @Component
    public class PublishProducer {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String name){
            String sendMsg = "发布订阅模式:" + name + "   " + new Date();
            //指定队列
            this.rabbitTemplate.convertAndSend("publishExchange","",sendMsg);
        }
    }
    
  3. 消费者

    @Component
    public class PublishConsumer {
        @RabbitListener(queues = "queue.publish.one")
        public void processOne(String name) {
            System.out.println("queue.publish.one:" + name);
        }
    
        @RabbitListener(queues = "queue.publish.two")
        public void processTwo(String name) {
            System.out.println("queue.publish.two:" + name);
        }
    
        @RabbitListener(queues = "queue.publish.three")
        public void processThree(String name) {
            System.out.println("queue.publish.three:" + name);
        }
    }
    
  4. 测试类(模仿控制层)

    @RestController
    @RequestMapping("/Publish")
    public class PublishController {
    
        @Autowired
        private PublishProducer sayProducer;
    
        @RequestMapping("/publish/{name}")
        public String send(@PathVariable String name){
            sayProducer.send(name);
            return "发送成功";
        }
    }
    
  5. 使用postman模拟请求

image
  1. 控制台结果

<font color=red>注意看时间,交换机会将消息推送到所有绑定到它的队列。</font>

路由模式

  1. 队列初始化

    //类型为direct
    @Configuration
    public class RoutingInitialization {
    
        //当没有这个队列的时候会自动创建
        @Bean
        Queue routingOne(){
            Queue queue = new Queue("queue.routing.one",true);
            return queue;
        }
        @Bean
        Queue routingTwo(){
            Queue queue = new Queue("queue.routing.two",true);
            return queue;
        }
        @Bean
        Queue routingThree(){
            Queue queue = new Queue("queue.routing.three",true);
            return queue;
        }
    
        //创建交换器
        @Bean
        DirectExchange routingExchange(){
            DirectExchange directExchange = new DirectExchange("routingExchange");
            return directExchange;
        }
    
        //绑定队列
        @Bean
        Binding bindingRoutingOne(Queue routingOne,DirectExchange routingExchange){
            Binding binding = BindingBuilder.bind(routingOne).to(routingExchange).with("1");
            return binding;
        }
        @Bean
        Binding bindingRoutingTwo(Queue routingTwo,DirectExchange routingExchange){
            Binding binding = BindingBuilder.bind(routingTwo).to(routingExchange).with("2");
            return binding;
        }
        @Bean
        Binding bindingRoutingThree(Queue routingThree,DirectExchange routingExchange){
            Binding binding = BindingBuilder.bind(routingThree).to(routingExchange).with("3");
            return binding;
        }
    }
    
  2. 生产者

    @Component
    public class RoutingProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String type){
            String sendMsg = "路由模式:" + type + "   " + new Date();
            //指定队列
            if (type.equals("1")){
                this.rabbitTemplate.convertAndSend("routingExchange","1",sendMsg);
            }
            if (type.equals("2")){
                this.rabbitTemplate.convertAndSend("routingExchange","2",sendMsg);
            }
            if (type.equals("3")){
                this.rabbitTemplate.convertAndSend("routingExchange","3",sendMsg);
            }
        }
    }
    
  3. 消费者

    @Component
    public class RoutingConsumer {
    
    
        @RabbitListener(queues = "queue.routing.one")
        public void processOne(String name) {
            System.out.println("queue.routing.one:" + name);
        }
    
        @RabbitListener(queues = "queue.routing.two")
        public void processTwo(String name) {
            System.out.println("queue.routing.two:" + name);
        }
    
        @RabbitListener(queues = "queue.routing.three")
        public void processThree(String name) {
            System.out.println("queue.routing.three:" + name);
        }
    
    }
    
  4. 测试类(模仿控制层)

    @RestController
    @RequestMapping("/Routing")
    public class RoutingController {
    
        @Autowired
        private RoutingProducer sayProducer;
    
        @RequestMapping("/routing/{name}")
        public String send(@PathVariable String name){
            sayProducer.send(name);
            return "发送成功";
        }
    }
    
  5. 使用postman模拟请求

image

[图片上传失败...(image-1c2e63-1591871192134)]

[图片上传失败...(image-c0c993-1591871192134)]

<font color=red>我这里测试传的就是routing key,方便看。</font>

  1. 控制台结果

[图片上传失败...(image-610dd9-1591871192134)]

[图片上传失败...(image-d8749-1591871192134)]

[图片上传失败...(image-25a6c2-1591871192134)]

<font color=red>这里用时间来区别。</font>

主题模式(Topic)

  1. 队列初始化

    //类型为topic
    @Configuration
    public class TopicInitialization {
    
        //当没有这个队列的时候会自动创建
        @Bean
        Queue topicOne(){
            Queue queue = new Queue("queue.topic.one",true);
            return queue;
        }
        @Bean
        Queue topicTwo(){
            Queue queue = new Queue("queue.topic.two",true);
            return queue;
        }
        @Bean
        Queue topicThree(){
            Queue queue = new Queue("queue.topic.three",true);
            return queue;
        }
    
        //创建交换器
        @Bean
        TopicExchange topicExchange(){
            TopicExchange directExchange = new TopicExchange("topicExchange");
            return directExchange;
        }
    
        //绑定队列
        @Bean
        Binding bindingTopicOne(Queue topicOne,TopicExchange topicExchange){
            Binding binding = BindingBuilder.bind(topicOne).to(topicExchange).with("#.error");
            return binding;
        }
        @Bean
        Binding bindingTopicTwo(Queue topicTwo,TopicExchange topicExchange){
            Binding binding = BindingBuilder.bind(topicTwo).to(topicExchange).with("#.log");
            return binding;
        }
        @Bean
        Binding bindingTopicThree(Queue topicThree,TopicExchange topicExchange){
            Binding binding = BindingBuilder.bind(topicThree).to(topicExchange).with("good.#.timer");
            return binding;
        }
    }
    
  2. 生产者

    @Component
    public class TopicProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String routing){
            String sendMsg = "主题模式:" + routing + "   " + new Date();
            //指定队列
            this.rabbitTemplate.convertAndSend("topicExchange",routing,sendMsg);
    
        }
    }
    
  3. 消费者

    @Component
    public class TopicConsumer {
        @RabbitListener(queues = "queue.topic.one")
        public void processOne(String name) {
            System.out.println("queue.topic.one:" + name);
        }
    
        @RabbitListener(queues = "queue.topic.two")
        public void processTwo(String name) {
            System.out.println("queue.topic.two:" + name);
        }
    
        @RabbitListener(queues = "queue.topic.three")
        public void processThree(String name) {
            System.out.println("queue.topic.three:" + name);
        }
    
    }
    
  4. 测试类(模仿控制层)

    @RestController
    @RequestMapping("/Topic")
    public class TopicController {
    
        @Autowired
        private TopicProducer sayProducer;
    
        @RequestMapping("/topic/{type}")
        public String send(@PathVariable String type){
            sayProducer.send(type);
            return "发送成功";
        }
    }
    
  5. 请求以及对应结果

    • [图片上传失败...(image-7d504-1591871192134)]
      [图片上传失败...(image-9566f5-1591871192134)]

    • [图片上传失败...(image-c1a54c-1591871192134)]

    [图片上传失败...(image-5ee38a-1591871192134)]

    • [图片上传失败...(image-dc3b14-1591871192134)]

      [图片上传失败...(image-13d98b-1591871192134)]

      <font color=red>注意看请求的key和打印日志的对应关系。</font>

尾言

消息队列在这里基本就结束了,结合前面两篇基本就能够了解队列的基本概念和用法了。

上一篇下一篇

猜你喜欢

热点阅读