Spring整合ActiveMQ
2017-01-17 本文已影响228人
ouyangan
activemq安装
- 进入activemq官网
- 下载apache-activemq-5.9.1-bin.tar.gz
- 配置jdk ,上传
activemq
至服务器 - `tar zxvf activemq-x.x.x.tar.gz``
chmod 755 activemq
cd [activemq_install_dir]/bin
-
./activemq start
,更多详细命令参考http://activemq.apache.org/version-5-getting-started.html
- 访问监控页面
http://ipAdress:8161
spring整合activemq
- 依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.3</version>
</dependency>
- 配置
<!-- ActiveMQ 连接工厂 -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.brokerURL}"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory"/>
<!-- 定义默认的队列名称-->
<property name="defaultDestinationName" value="queueDemo"/>
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory"/>
<property name="defaultDestinationName" value="topicDemo"/>
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true"/>
</bean>
<!--监听器-->
<bean id="queueDemoA" class="com.hunt.jms.demo.QueueDemoA"/>
<bean id="subDemoA" class="com.hunt.jms.demo.SubDemoA"/>
<bean id="subDemoB" class="com.hunt.jms.demo.SubDemoB"/>
<bean id="subDemoC" class="com.hunt.jms.demo.SubDemoC"/>
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="queueDemo" ref="queueDemoA"/>
</jms:listener-container>
<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="topicDemo" ref="subDemoA"/>
<jms:listener destination="topicDemo" ref="subDemoB"/>
<jms:listener destination="topicDemo" ref="subDemoC"/>
</jms:listener-container>
- 实现监听器
public class QueueDemoA implements MessageListener {
//计数器
private AtomicInteger count = new AtomicInteger(1);
@Override
public void onMessage(Message message) {
try {
System.out.println();
Thread.currentThread().sleep(2000L);
System.out.println("队列接收消息-> " + (count.incrementAndGet()));
ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
System.out.println(message1.getText());
} catch (InterruptedException | JMSException e) {
e.printStackTrace();
}
}
}
public class SubDemoA implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println();
Thread.currentThread().sleep(2000L);
ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
System.out.println("A接收消息-> " +message1.getText());
} catch (InterruptedException | JMSException e) {
e.printStackTrace();
}
}
}
public class SubDemoB implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println();
Thread.currentThread().sleep(2000L);
ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
System.out.println("B接收消息-> " +message1.getText());
} catch (InterruptedException | JMSException e) {
e.printStackTrace();
}
}
}
- 测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:spring.xml"})
@Transactional
public class DebugTest {
@Resource(name = "jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
@Resource(name = "jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
@Test
public void testQueue() {
for (int i = 0; i < 1000; i++) {
sendMessage(String.valueOf(i + 1));
}
while (true) {
}
}
@Test
public void testTopic() {
for (int i = 0; i < 1000; i++) {
sendTopic(String.valueOf(i + 1));
}
while (true) {
}
}
private void sendMessage(String msg) {
jmsQueueTemplate.send(session -> session.createTextMessage(msg));
}
private void sendTopic(String msg) {
jmsTopicTemplate.send(session -> session.createTextMessage(msg));
}
}
-
队列效果
Paste_Image.png -
发布/订阅效果
Paste_Image.png