activeMQ

Spring整合ActiveMQ

2017-01-17  本文已影响228人  ouyangan

activemq安装

spring整合activemq

           <dependency>
               <groupId>org.apache.activemq</groupId>
               <artifactId>activemq-core</artifactId>
               <version>5.7.0</version>
           </dependency>
           <dependency>
               <groupId>org.apache.activemq</groupId>
               <artifactId>activemq-pool</artifactId>
               <version>5.14.3</version>
           </dependency>
    <!-- ActiveMQ 连接工厂 -->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.brokerURL}"/>
        <property name="userName" value="${activemq.username}"/>
        <property name="password" value="${activemq.password}"/>
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory"/>
        <!-- 定义默认的队列名称-->
        <property name="defaultDestinationName" value="queueDemo"/>
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false"/>
    </bean>
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory"/>
        <property name="defaultDestinationName" value="topicDemo"/>
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true"/>
    </bean>

    <!--监听器-->
    <bean id="queueDemoA" class="com.hunt.jms.demo.QueueDemoA"/>
    <bean id="subDemoA" class="com.hunt.jms.demo.SubDemoA"/>
    <bean id="subDemoB" class="com.hunt.jms.demo.SubDemoB"/>
    <bean id="subDemoC" class="com.hunt.jms.demo.SubDemoC"/>

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="queueDemo" ref="queueDemoA"/>
    </jms:listener-container>

    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="topicDemo" ref="subDemoA"/>
        <jms:listener destination="topicDemo" ref="subDemoB"/>
        <jms:listener destination="topicDemo" ref="subDemoC"/>
    </jms:listener-container>
public class QueueDemoA implements MessageListener {
    //计数器
    private AtomicInteger count = new AtomicInteger(1);

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println();
            Thread.currentThread().sleep(2000L);
            System.out.println("队列接收消息-> " + (count.incrementAndGet()));
            ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
            System.out.println(message1.getText());
        } catch (InterruptedException | JMSException e) {
            e.printStackTrace();
        }
    }
}
public class SubDemoA implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println();
            Thread.currentThread().sleep(2000L);
            ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
            System.out.println("A接收消息-> " +message1.getText());
        } catch (InterruptedException | JMSException e) {
            e.printStackTrace();
        }
    }
}
public class SubDemoB implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println();
            Thread.currentThread().sleep(2000L);
            ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
            System.out.println("B接收消息-> " +message1.getText());
        } catch (InterruptedException | JMSException e) {
            e.printStackTrace();
        }
    }
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:spring.xml"})
@Transactional
public class DebugTest {

    @Resource(name = "jmsQueueTemplate")
    private JmsTemplate jmsQueueTemplate;
    @Resource(name = "jmsTopicTemplate")
    private JmsTemplate jmsTopicTemplate;

    @Test
    public void testQueue() {
        for (int i = 0; i < 1000; i++) {
            sendMessage(String.valueOf(i + 1));
        }
        while (true) {

        }
    }

    @Test
    public void testTopic() {
        for (int i = 0; i < 1000; i++) {
            sendTopic(String.valueOf(i + 1));
        }
        while (true) {

        }
    }
    private void sendMessage(String msg) {
        jmsQueueTemplate.send(session -> session.createTextMessage(msg));
    }

    private void sendTopic(String msg) {
        jmsTopicTemplate.send(session -> session.createTextMessage(msg));
    }
}
上一篇下一篇

猜你喜欢

热点阅读