RabbitMQ学习(七)与Spring AMQP整合

2018-11-29  本文已影响0人  kobe0429

看到此篇的时候恭喜同学们已经对RabbitMQ的使用有了大致的掌握,前面6篇都是基于RabbitMQ的基础api实现的,从此篇开始我们将分别讨论与Spring AMQP、Spring Boot、Spring Cloud的整合操作,本篇主要讨论与Spring AMQP的整合流程。
主要与以下6种组件的整合:
RabbitAdmin
SpringAMQP声明
RabbitTemplate
SimpleMessageListenerContainer
MessageListenerAdapter
MessageConverter

1、RabbitAdmin

(1)RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
(2)autoStartup必须设置true,否则Spring容器不会加载RabbitAdmin类
(3)RabbitAdmin底层实现就是从Spring容器中获取Exchagge,Bingding,RoutingKey以及Queue的@Bean声明
(4)然后使用RabbitTemplate的execute方法执行对应声明,修改,删除等操作

2、SpringAMQP声明

SpringAMQP声明即在rabbit基础API里面声明一个exchange、Bingding、queue。使用SpringAMQP去声明,就需要使用@bean的声明方式。

3、RabbitTemplate

Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

4、SimpleMessageListenerContailer

(1)简单消息监听容器:这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
(2)设置事务特性,事务管理器,事务属性,事务容量,事务开启等
(3)设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
(4)设置消费者标签生成策略,是否独占模式,消费者属性等
(5)simpleMessageListenerContailer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小,接收消息的模式等

5、MessageListenerAdapter

1)可以把一个没有实现MessageListener和ChannelAwareMessageListener接口的类适配成一个可以处理消息的处理器
2)默认的方法名称为:handleMessage,可以通过setDefaultListenerMethod设置新的消息处理方法
3)MessageListenerAdapter支持不同的队列交给不同的方法去执行。使用setQueueOrTagToMethodName方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。

6、MessageConverter

(1)消息转换器
(2)在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要MessageConverter
(3)实现MessageConverter接口,重写toMessage(java对象转换为Message) fromMessage(Message对象转换为java对象)
(4)json转换器,自定义二进制转换器(比如图片类型,pdf,ppt,流媒体)

下面我们用代码来演示以下RMQ是怎么整合到Spring中去的,以RabbitAdmin为例:
1、在pom.xml中添加引用

           <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>   

2、创建配置类,即为类添加配置注解和扫描注解

package com.bfxy.spring;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {
//TODO
}

3、在配置类中通过Bean注入的形式把组件注入到spring容器中

package com.bfxy.spring;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;

@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("10.136.197.244:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

以上,准备工作完成,让我们写一个测试类来验证一下吧

package com.bfxy.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
public class ApplicationTests {
    @Test
    public void contextLoads() {
    }
        @Autowired
    private RabbitAdmin rabbitAdmin;    
    @Test
    public void testAdmin() throws Exception {
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));       
        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));     
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));       
        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));        
        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));     
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
                rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct", "direct", new HashMap<>()));     
        rabbitAdmin.declareBinding(
                BindingBuilder
                .bind(new Queue("test.topic.queue", false))     //直接创建队列
                .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
                .with("user.#"));   //指定路由Key       
        rabbitAdmin.declareBinding(
                BindingBuilder
                .bind(new Queue("test.fanout.queue", false))        
                .to(new FanoutExchange("test.fanout", false, false)));      
        //清空队列数据
        rabbitAdmin.purgeQueue("test.topic.queue", false);
    }
}

以上代码我们创建了一些交换机和队列,启动测试类,去RabbitMQ的管控台查看信息如下:


RabbitMQ管控台信息.JPG
上一篇下一篇

猜你喜欢

热点阅读