Java 杂谈

01_Spring集成RabbitMQ之声明式注解

2019-08-29  本文已影响0人  明天你好向前奔跑

@Author Jacky Wang
日常积累,转载请注明出处,https://www.jianshu.com/p/b081f1fd1480
最近新开发的一个项目是由传统的SSM架构进行开发的,之前介绍了SpringBoot集成RabbitMQ的方式,这次特地对Spring集成RabbitMQ做一次记录及介绍。
如需较详细了解RabbitMQ的相关知识,可参考我的另一篇文章:03_SpringBoot集成RabbitMQ

由Spring集成RabbitMQ一般采用Xml配置或注解式两种方式来进行集成。由于个人不喜欢过多的xml文件,因此这里仅对注解方式进行记录。

1. 声明式注解集成RabbitMQ

1.1 步骤

  1. 引入pom依赖
  2. 创建配置文件,包含mq的基础配置
  3. 创建RabbitMQ监听器
  4. 声明RabbitMQ配置类
  5. 创建消息生产者
  6. 测试

1.2 集成

1.2.1 引入Pom依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.5.RELEASE</version>
</dependency>
<!-- 解决冲突 -->
<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.13</version>
</dependency>
1.2.2 创建配置文件 rabbitmq.properties
#rabbitmq.host=127.0.0.1
#rabbitmq服务器
rabbitmq.host=192.168.3.171
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual.host=/

#交换机
exchange.mes.com.add=mes.fanout.com.add
exchange.mes.com.del=mes.fanout.com.delete
exchange.mes.com.update=mes.fanout.com.update
exchange.mes.user.add=mes.fanout.user.add
exchange.mes.user.del=mes.fanout.user.delete
exchange.mes.user.update=mes.fanout.user.update

#队列
queue.mes.com.add=mes_company_add
queue.mes.com.del=mes_company_delete
queue.mes.com.update=mes_company_update
queue.mes.user.add=mes_user_add
queue.mes.user.del=mes_user_delete
queue.mes.user.update=mes_user_update
1.2.3 创建RabbitMQ监听器
/**
 * 消费监听类
 */
@Component
@Transactional
public class QueueListener {

    public static final Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Autowired
    private AppInterfaceService appInterfaceService;
    @Autowired
    private RabbitMQConfig rabbitMQConfig;
    @Autowired
    private OfficeMapper officeMapper;
    @Autowired
    private SystemService systemService;
    @Autowired
    private UserMapper userMapper;

    /**
    * 如果监听队列指定的方法不存在则执行默认方法
    */
    public void onMessage(byte[] msg) {
        try {
            logger.info("onMessage : [{}]", new String(msg, "UTF-8"));
        } catch (Exception e) {
            logger.error("Error : [{}]", e);
        }
    }

    /**
     * 公司信息同步
     *
     * @param message
     */
    public void addCompany(byte[] message) {
        logger.info("RabbitMQ Method addCompany Get Msg : [{}]", new String(message));
    }

    public void updateCompany(byte[] message) {
        logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
    }

    /**
     * 用户信息同步
     *
     * @param message
     */
    public void addUser(byte[] message) {
        logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
    }

    public void delUser(byte[] message) {
        logger.info("RabbitMQ Method delUser Get Msg : [{}]", new String(message));
    }

    public void updateUser(byte[] message) {
        logger.info("RabbitMQ Method updateUser Get Msg : [{}]", new String(message));
    }
}
1.2.4 创建RabbitMQ声明配置类
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

import java.util.HashMap;
import java.util.Map;

@Configuration
@PropertySource(value = "classpath:/properties/rabbitmq.properties")
public class RabbitMQConfig {
    @Autowired
    private QueueListener queueListener;

    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.port}")
    private int port;
    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;
    @Value("${rabbitmq.virtual.host}")
    private String vhost;

    @Value("${exchange.mes.com.add}")
    private String companyAddExchangeName;
    @Value("${exchange.mes.com.del}")
    private String companyDelExchangeName;
    @Value("${exchange.mes.com.update}")
    private String companyUpdateExchangeName;
    @Value("${exchange.mes.user.add}")
    private String userAddExchangeName;
    @Value("${exchange.mes.user.del}")
    private String userDelExchangeName;
    @Value("${exchange.mes.user.update}")
    private String userUpdateExchangeName;

    @Value("${queue.mes.com.add}")
    private String companyAddQueueName;
    @Value("${queue.mes.com.del}")
    private String companyDelQueueName;
    @Value("${queue.mes.com.update}")
    private String companyUpdateQueueName;
    @Value("${queue.mes.user.add}")
    private String userAddQueueName;
    @Value("${queue.mes.user.del}")
    private String userDelQueueName;
    @Value("${queue.mes.user.update}")
    private String userUpdateQueueName;

    /**
     * rabbitmq连接配置
     *
     * @return
     */
    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin admin = new RabbitAdmin(rabbitConnectionFactory());
        admin.setIgnoreDeclarationExceptions(true); //即使有关rabbitmq的bean初始化失败整个web应用还能正常启动
        return admin;
    }

    /**
     * 声明交换机Exchange
     *
     * @return
     */
    @Bean
    public FanoutExchange companyAddExchange() {
        return new FanoutExchange(companyAddExchangeName, true, false);
    }

    @Bean
    public FanoutExchange companyDelExchange() {
        return new FanoutExchange(companyDelExchangeName, true, false);
    }

    @Bean
    public FanoutExchange companyUpdateExchange() {
        return new FanoutExchange(companyUpdateExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userAddExchange() {
        return new FanoutExchange(userAddExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userDelExchange() {
        return new FanoutExchange(userDelExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userUpdateExchange() {
        return new FanoutExchange(userUpdateExchangeName, true, false);
    }

    /**
     * 声明队列Queue
     *
     * @return
     */
    @Bean
    public Queue companyAddQueue() {
        return new Queue(companyAddQueueName, true, false, false);
    }

    @Bean
    public Queue companyDelQueue() {
        return new Queue(companyDelQueueName, true, false, false);
    }

    @Bean
    public Queue companyUpdateQueue() {
        return new Queue(companyUpdateQueueName, true, false, false);
    }

    @Bean
    public Queue userAddQueue() {
        return new Queue(userAddQueueName, true, false, false);
    }

    @Bean
    public Queue userDelQueue() {
        return new Queue(userDelQueueName, true, false, false);
    }

    @Bean
    public Queue userUpdateQueue() {
        return new Queue(userUpdateQueueName, true, false, false);
    }

    /**
     * 将队列绑定到指定的交换机
     *
     * @param companyAddQueue
     * @param companyAddExchange
     * @return
     */
    @Bean
    public Binding companyAddBinding(Queue companyAddQueue, FanoutExchange companyAddExchange) {
        return BindingBuilder.bind(companyAddQueue).to(companyAddExchange);
    }

    @Bean
    public Binding companyDelBinding(Queue companyDelQueue, FanoutExchange companyDelExchange) {
        return BindingBuilder.bind(companyDelQueue).to(companyDelExchange);
    }

    @Bean
    public Binding companyUpdateBinding(Queue companyUpdateQueue, FanoutExchange companyUpdateExchange) {
        return BindingBuilder.bind(companyUpdateQueue).to(companyUpdateExchange);
    }

    @Bean
    public Binding userAddBinding(Queue userAddQueue, FanoutExchange userAddExchange) {
        return BindingBuilder.bind(userAddQueue).to(userAddExchange);
    }

    @Bean
    public Binding userDelBinding(Queue userDelQueue, FanoutExchange userDelExchange) {
        return BindingBuilder.bind(userDelQueue).to(userDelExchange);
    }

    @Bean
    public Binding userUpdateBinding(Queue userUpdateQueue, FanoutExchange userUpdateExchange) {
        return BindingBuilder.bind(userUpdateQueue).to(userUpdateExchange);
    }

    /**
     * 声明消费者监听执行类
     * @param receiver
     * @return
     */
    /*@Bean
    public MessageListenerAdapter listenerAdapter(QueueListener receiver) {
        MessageListenerAdapter m = new MessageListenerAdapter(receiver, "process");
        m.setMessageConverter(jsonMessageConverter());
        return m;
    }*/

    /**
     * 消费者容器
     * 为不同队列指定不同的执行方法
     * @param rabbitConnectionFactory
     * @return
     */
    @Bean
    SimpleMessageListenerContainer listenerContainer(ConnectionFactory rabbitConnectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory);
        //container.setMessageConverter(jsonMessageConverter());
        //container.setConcurrentConsumers(1);
        //container.setMaxConcurrentConsumers(5);
        //container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(q -> projectKey + "_" + q);
        //container.setQueues(companyAddQueue(), companyDelQueue(), companyUpdateQueue(), userAddQueue(), userDelQueue(), userUpdateQueue());
        container.setQueueNames(companyAddQueueName, companyDelQueueName, companyUpdateQueueName, userAddQueueName, userDelQueueName, userUpdateQueueName);

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(queueListener);
        listenerAdapter.setDefaultListenerMethod("onMessage");
        listenerAdapter.setMessageConverter(jsonMessageConverter());
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put(companyAddQueueName, "addCompany");
        queueOrTagToMethodName.put(companyDelQueueName, "delCompany");
        queueOrTagToMethodName.put(companyUpdateQueueName, "updateCompany");
        queueOrTagToMethodName.put(userAddQueueName, "addUser");
        queueOrTagToMethodName.put(userDelQueueName, "delUser");
        queueOrTagToMethodName.put(userUpdateQueueName, "updateUser");
        listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}
1.2.5 创建消息生产者
@Service
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final static Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);

    public void sendDataToQueue(String exchange, String routingKey, Object object) {
        try {
            rabbitTemplate.setMessagePropertiesConverter(new MessagePropertiesConverter() {
                @Override
                public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
                    MessageProperties messageProperties = new MessageProperties();
                    messageProperties.setContentType("application/json");
                    messageProperties.setContentEncoding("UTF-8");
                    return messageProperties;
                }

                @Override
                public AMQP.BasicProperties fromMessageProperties(MessageProperties source, String charset) {
                    return null;
                }
            });

            rabbitTemplate.convertAndSend(exchange, routingKey, object);
        } catch (Exception e) {
            logger.error("发送mq消息异常,Cause:[]", e);
        }

    }
}
1.2.6 测试
自定义Test或自定义Controller测试调用生产者发送消息,查看消费者是否消费即可。

eg:

@Controller
@RequestMapping("${adminPath}/rabbitmq")
public class RabbitMQController {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @RequestMapping("/sendToCompanyAdd")
    @ResponseBody
    public String sendToCompanyAdd(String id) {
        HashMap<String, String> map = new HashMap<>();
        map.put("id", id);
        rabbitMQSender.sendDataToQueue("mes.fanout.com.add", null, map);
        return "SUCCESS";
    }
}

3 注意事项

2. Xml方式集成配置文件参考

spring-rabbitmq.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <description>rabbitmq 连接服务配置</description>

    <!-- 加载配置属性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:/properties/rabbitmq.properties"/>

    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}"
                               password="${rabbitmq.password}" port="${rabbitmq.port}"
                               virtual-host="${rabbitmq.vhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring template声明-->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     message-converter="jsonMessageConverter"/>

    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!-- 申明一个消息队列Queue -->
    <rabbit:queue id="testQueueId" name="${rabbitmq.queue}" durable="false" auto-delete="false" exclusive="false"/>

    <!-- 定义交换机 -->
    <rabbit:direct-exchange id="testExchangeId" name="${rabbitmq.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="testQueueId" key="${rabbitmq.routingKey}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- MQ监听配置 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="testQueueId" ref="queueListener" method="process"/>
    </rabbit:listener-container>
</beans>
上一篇下一篇

猜你喜欢

热点阅读