MQ

找到组织——spring整合mq

2017-11-05  本文已影响6人  getthrough
    <!--集中定义依赖版本-->
    <properties>
        <activemq.version>5.11.2</activemq.version>
        <spring.version>4.2.4.RELEASE</spring.version>
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>
        <!--activemq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq.version}</version>
        </dependency>
        <!--jms-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!--整合支持-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!--junit-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
    </dependencies>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd">

    <!--开启注解扫描-->
    <context:annotation-config/>

    <!--JMS服务厂商提供的连接工厂-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
    </bean>

    <!--spring对厂商提供的连接工厂的包装,方便更换不同的JMS实现-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!--引用了实际的连接工厂-->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>

    <!--配置生产者,使用spring提供的jms模板-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--将spring包装后的连接工厂注入-->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <!--点对点消息队列-->
    <bean class="org.apache.activemq.command.ActiveMQQueue" id="activeMQQueue">
        <constructor-arg>
            <!--指定队列名称-->
            <value>spring_queue</value>
        </constructor-arg>
    </bean>

    <!--发布/订阅消息队列-->
    <bean class="org.apache.activemq.command.ActiveMQTopic" id="activeMQTopic">
        <constructor-arg>
            <value>spring_topic</value>
        </constructor-arg>
    </bean>

spring整合后的连接工厂是包装后的连接工厂,今后若是项目想要替换消息中间件技术如RabbitMqRocketMq,只需要在配置文件中更改连接工厂的实现即可(还有注解信息...)。

/**
 * 使用spring整合Junit4方式测试代码
 * 避免了测试方法前使用@Before注解手动初始化spring容器
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class Queue_Producer {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Resource(name = "activeMQQueue")
    private Queue queue;

    @Test
    public void sendQueueMessage() {
        jmsTemplate.send(queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("text five!");
            }
        });
    }
}
/**
 * 消息监听器
 */
public class MyMessageListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            System.out.println(textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
-------------------在applicationContext.xml中追加以下内容---------------------
    <!--消息监听器-->
    <bean id="myMessageListener" class="listener.MyMessageListener"/>

    <!--消息监听容器-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="messageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageListener" ref="myMessageListener"/>
        <property name="destination" ref="activeMQQueue"/>
    </bean>

运行生产者类,消息即可被消费。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class Topic_Producer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("activeMQTopic")
    private Topic topic;

    @Test
    public void sendMessage() {
        jmsTemplate.send(topic, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("its a topic message");
            }
        });
    }
}

applicationContext.xml中创建监听容器,监听类为上文的 listener

    <!--广播消息监听容器-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="topicMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageListener" ref="myMessageListener"/>
        <property name="destination" ref="activeMQTopic"/>
    </bean>

运行生产者类,消息即可被消费。

spring 整合了 ActiveMq 之后,当 spring 容器初始化时,监听器就会监听连接工厂中定义的 ip 和 端口,一旦生产者将消息发送至服务器,监听器就会获取消息进行消费。

注意:本案例的MyMessageListener类不能定义在Test目录下,否则无法解析该类。

若有疑惑,可能阅读 http://www.jianshu.com/p/1439340d2388 有所帮助。

本文完。

代码地址:https://github.com/Getthrough/ActiveMq_Spring_DemoCode

上一篇 下一篇

猜你喜欢

热点阅读