SpringBoot整合activeMQ
ActiveMQ简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ特性
1.多语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。
2.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
3。完全支持JMS1.1和J2EE 1.4规范(持久化、XA消息,事物)。
4.对Spring的支持,ActiveMQ可以很容易嵌套到使用Spring的系统里面去,而且也支持Spring2.0的特征。
5.通过了常见的J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5。
6.resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
7.支持多种传送协议: in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
8.支持通过JDBC和journal提供高速的消息持久化。
9.从设计上保证了高性能的集群,客户端-服务器,点对点。
10.支持Ajax。
11.支持和Axis的整合。
12.可以很容易的调用内嵌JMS provider,进行测试。
什么情况下使用ActiveMQ?
多个项目之间集成 ;跨平台; 多语言 ;多项目;降低系统间模块的耦合度,解耦 ;软件扩展性;系统前后端隔离 ;前后端隔离,屏蔽高安全区;
入门实例
引入依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
application.properties配置文件中新增配置
# ActiveMQ--------------------------------------------------------------------------------------------------------------
# Specify if the default broker URL should be in memory. Ignored if an explicit broker has been specified.
# 默认为true表示使用内存的activeMQ,不需要安装activeMQ server
#spring.activemq.in-memory=false
# URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
spring.activemq.broker-url=tcp://localhost:61616
# Login user of the broker.
spring.activemq.user=admin
# Login password of the broker.
spring.activemq.password=admin
# Trust all packages.
#spring.activemq.packages.trust-all=false
# Comma-separated list of specific packages to trust (when not trusting all packages).
#spring.activemq.packages.trusted=
# See PooledConnectionFactory.
#spring.activemq.pool.configuration.*=
# Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory.
spring.activemq.pool.enabled=true
# Maximum number of pooled connections.
spring.activemq.pool.max-connections=50
# Connection expiration timeout in milliseconds.
spring.activemq.pool.expiry-timeout=10000
# Connection idle timeout in milliseconds.
spring.activemq.pool.idle-timeout=30000
# 如果为True,则是Topic;如果是false或者默认,则是queue。
spring.jms.pub-sub-domain=false
新建消息生产者类
@Service("producer")
public class Producer {
@Resource
private JmsTemplate jmsTemplate;
/**
* 发送消息
*
* @param destination 发送到的队列
* @param message 待发送的消息
*/
public void convertAndSend(Destination destination, final String message) {
jmsTemplate.convertAndSend(destination, message);
}
}
新建消息消费者ABC
@Component
public class ConsumerA {
/**
* 使用JmsListener配置消费者监听的队列
*
* @param text 接收到的消息
*/
@JmsListener(destination = "suimh_queue")
public void receiveQueue(String text) {
System.out.println("Consumer-A : 收到的报文为:" + text);
}
}
@Component
public class ConsumerB {
/**
* 使用JmsListener配置消费者监听的队列
*
* @param text 接收到的消息
*/
@JmsListener(destination = "suimh_queue")
@SendTo("out.queue")
public String receiveQueue(String text) {
System.out.println("Consumer-B : 收到的报文为:" + text);
return text;
}
}
@Component
public class ConsumerC {
/**
* 使用JmsListener配置消费者监听的队列
*
* @param text 接收到的消息
*/
@JmsListener(destination = "out.queue")
public void consumerMessage(String text) {
System.out.println("Consumer-C : 从out.queue队列收到的回复报文为:" + text);
}
}
测试:
/**
* activeMQ测试方法(queue队列模式)
* 如果想用Topic发布订阅模式,需要新建ActiveMQTopic实例,并修改配置文件spring.jms.pub-sub-domain=true
*
* @return String
*/
@GetMapping(value = "/activeMqSendMes")
@ResponseBody
public String activeMqSendMes() {
int num = 10;
try {
Destination destinationQueue = new ActiveMQQueue("suimh_queue");
for (int i = 1; i <= num; i++) {
producer.convertAndSend(destinationQueue, "这是queueProducer发送的第" + i + "个消息!");
}
return "activeMQ生产成功!";
} catch (Exception e) {
return "activeMQ生产失败!";
}
}