ActiveMQ 从零到最佳实践
最近一段时间想把自己在以前项目中用的技术总结出来比如 nginx,redis比较实用的技术分享一下自己的心得体会供更多有求知欲望的朋友学习,当然自己的总结有一定的局限性,在总结这篇文章时我也查了很多资料,在这里我就先把ActiveMQ 走一遍,随后一段时间会对nginx,redis进行详细的讲解
ActiveMQ 简介:ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
在学习之前我们先要知道一下几个问题
ActiveMQ特性
⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通过JDBC和journal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
⒏ 支持Ajax
⒐ 支持与Axis的整合
⒑ 可以很容易的调用内嵌JMS provider,进行测试
为什么要用ActiveMQ?
高性能的数据分发:ActiveMQ的这个特性主要关注的是消息的吞吐率以及高效的消息投递路由,中心思想就是在一个大的网络中尽可能快的传递大量的并且快速改变的消息数据。
鉴于大量的数据和频繁的数据数据交换负荷很高,所以这种情况下很少使用数据持久化,在失败时丢失几条数据也是可以接受的因为老的数据通常都不再被需要了,最新的数据才是真正我们关心的。
集群和通用的异步消息模型:这种特性重点在网络延迟和速度,当实现一个web或者EJB集群的时候,目的是维护一个node集群,典型的是使用多点广播来discovery&keep-alive然后使用socket直接连接这些node来进行高效的通信。
这和使用JMS provider在EJB-Style或者WS-style的服务中作为RMI层是很相似的,都能使用多点广播来discovery&keep-alive并且使用socket直接连接通信以减少延迟。所以与其使用不同的服务器来协调client之间的通信,不如让client直接和彼此通信来减少延迟。
Ps: 此段主要讲的是activeMQ的node之间会有高效的异步通信机制,网络延迟小并且高效
网络数据流:这种特性关注点是activeMQ的ajax支持,越来越多的人希望数据流能实时的传递到网络浏览器中,例如金融行业的股票价格数据,实时的在展示IM会话,实时拍卖并且动态更新内容和消息。
鉴于这种情况,我们把ActiveMQ集成到了web容器中来提供封闭的网络集成,使用HTTP POSTS来发布消息并且在js中通过HTTP GET来接受并展示消息。
简易的使用HTTP来传递消息的API:ActiveMQ这种特性主要关注跨语言跨技术的连接能力,我们为message broker提供了一个HTTP接口允许跨语言或者技术来进行简单的发送和接受消息。使用HTTP POST将消息发送到message broker,使用HTTP GET从message broker获取消息,使用URI并且指定参数来决定接受/发送的目的地。
什么场景下使用了ActiveMQ?
在分布式应用的环境中,多个独立运行的系统之间,需要相互调用,相互通讯的场景下使用。
在我们的商城中有很多地方需要用到ActiveMQ通讯的,我简单列举几个吧:
商品系统在确认订单后,发送一个“生产订单”的主题消息到ActiveMQ,订单系统收到该主题消息后会做相应的生产订单及相关的所有业务操作。
在支付界面中,点击确认支付,调用第三方支付接口,支付成功后回调我们的支付系统,然后支付系统会发送一个“支付成功”的主题消息到ActiveMQ,仓储系统收到该主题消息后会做库存的相关操作及其相应的业务。
在网页静态化中,我们修改了某些数据,那么会发送一个“生成静态化”的队列消息到ActiveMQ,负责生成静态化页面的方法接收到该消息后,从新生成静态化页面,从而达到更新页面数据的目的。
JMS
JMS(Java Messaging Service)应用程序接口,是Java平台上有关面向消息中间件(MOM)的技术规范API,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。翻译为Java消息服务。
JMS规范:
JMS定义了Java中访问消息中间件的接口,并没有给予实现,实现JSM接口的消息中间件称为JMS Provider,例如ActiveMQ
几个关于JMS的概念:
1、JMS Provider:实现JMS接口和规范的消息中间件
2、JMS Message:JMS消息,分3个部分:
(1)消息头:每个消息头字段都有相应的getter和setter方法
(2)消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性
(3)消息体:封装具体的消息数据
3、JMS Producer:消息生产者,创建和发送JMS消息的客户端应用
4、JMS Consumer:消息消费者,接收和处理JMS消息的客户端应用
5、JMS Domain:消息传递域,JMS规范中定义了两种消息传递域:
(1)点对点(point-to-point,简写PTP或P2P)消息传递域,该消息传递域发送的消息目的地称为队列(queue)
特点:
a、每个消息只能有一个消费者
b、消息的生产者和消费者之间没有时间上的相关性,无论消息消费者在提取消息的时候,消息生产者是否处于运行状态,消息消费者还是可以提取消息
(2)发布/订阅(publish/subscribe,简写pub/sub)消息传递域,该消息传递域发送的消息目的地称为主题(topic)
特点:
a、每个消息可以有多个消费者
b、生产者和消费者之间有时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息。
ActiveMQ和JMS是什么关系?
JMS和ActiveMQ是接口和实现的关系,ActiveMQ完全支持JMS1.1规范的JMS Provider实现的消息中间件(MOM)
JMS的消息传递域有什么?各自有什么特点?简单列举一下各自的一些使用场景
JMS的消息传递域有点对点(point-to-point,简写PTP或P2P)消息传递域和发布/订阅(publish/subscribe,简写pub/sub)消息传递域。
点对点消息传递域的特点:每个消息只能有一个消费者并且消息的生产者和消费者之间没有时间上的相关性,无论消息消费者在提取消息的时候,消息生产者是否处于运行状态,消息消费者还是可以提取消息
发布/订阅消息传递域的特点:每个消息可以有多个消费者并且生产者和消费者之间有时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息。
ActiveMQ的原理是什么?
首先需要创建连接工厂(ConnectionFactory ),用于连接ActiveMQ的服务端,其中ConnectionFactory 又有两个子类分别是队列连接工厂(QueueConnectionFactory)和主题连接工厂(TopicConnectionFactory),在我们发送消息的时候具体使用哪个连接工厂是由消息发送的目的地(Destination)来决定的,Destination是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic),对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)
然后在确定了使用哪种连接工厂后,就通过该连接工厂创建出连接对象(Connection),该连接对象是对TCP/IP socket的包装。连接对象可以产生多个会话(session),通过会话来发送消息(Message),这就是我们发送和接受的消息。
消息中间件
消息中间件MOM(Message Oriented Middleware)是在分布式环境中,两个或多个独立运行的系统之间,提供消息通讯作用的中介。
如图所示,假设当我们客户端现在需要确认商品的订单,首先需要调用商品服务的方法,商品系统服务不能直接与订单系统通信.这时商品系统只能通过发送消息msg1(id=xxx)到消息中间件,当订单系统接收到消息中间件的消息msg1时就会立刻执行生成订单的方法.这就是我们消息中间件的角色
消息中间件的作用:
把各个系统之间服务的调用以消息通讯的方式交互
消息中间件的特点:
1、消息异步接收,消息发送者不需要等待消息接收着的响应,提高整个应用程序的性能
2、消息可靠接收:消息发送出去后保存在一个中间容器内,只有消息接收者收到消息后才删除消息
消息中间件的主要应用场景:
在多个系统间进行通讯的时候,通常会要求:
(1)可靠传输,数据不能丢,有的时候,也会要求不能重复传输
(2)异步传输,否则各个系统同步发送接收数据,相互等待,造成系统性能低下
比较流行的消息中间件有哪些:
收费:IBM MQSeries,BEA WebLogic JMS Server,Oracle AQ
开源:ActiveMQ,RocketMQ,Kafka
ActiveMQ安装使用
下载ActiveMQ运行程序:
1、从官网上下载运行程序:http://activemq.apache.org/download.html
2、拷贝到你要安装的位置直接解压就好了
启动ActiveMQ
Windows版:双击bin目录下activemq.bat
Unix/Linux/Cygwin版本:1.需要使用终端命令(cd +apache-activemq-5.10.0的根目录 )
cd /Users/zhangshuai/Library/apache-activemq-5.10.0
- bin/activemq start
然后访问地址为localhost:8161/admin
如果需要输入账号与密码可以直接都输入admin
在conf目录中有个activemq.xml文件,该文件有3个重要设置:
1、设置ActiveMQ的消息监听端口,默认为61616
2、设置ActiveMQ的管理界面的端口,默认为8161。ActiveMQ使用的是内嵌的jetty服务器来运行它提供的管理界面的,该管理界面的访问地址为localhost:8161/admin
3、设置ActiveMQ的访问用户名和密码,默认用户名和密码都为admin。设置方法,添加如下配置:
<!-- 添加访问ActiveMQ的账号密码 -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="zhangshuai" password="123456" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
Springboot整合ActiveMQ
在这里我以一个简单商城为案例
1.向商品的服务端shop-server-goods和shop-server-order的pom.xml中添加springboot-ActiveMQ依赖
<!--spring-boot的activeMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2.向商品的服务端shop-server-goods和shop-server-order的application.properties添加属性配置
在springboot应用程序中的主配置文件application.properties文件中添加如下ActiveMQ的属性配置
#activeMQ部署地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#activeMQ访问用户名
spring.activemq.user=admin
#activeMQ访问密码
spring.activemq.password=admin
主要我们的两个服务端都连接到同一个ActiveMQ上了
接下来我们先演示发送Queue模型的消息(以创建订单为例子)
发送queue模型的消息的关键是创建一个ActiveMQQueue对象
创建一个消息生产者类CreateOrderInfoProducer.java
/**
* 创建订单的消息生产类
*/
@Component//交给spring容器管理
public class CreateOrderInfoProducer {
//确定消息发送到哪个地点
Destination destination = new ActiveMQQueue("shop.queue.createOrderInfo");
//注入spirng帮我们封装好的JMS模板类
@Autowired
private JmsTemplate jmsTemplate;
/**
* 发送创建订单消息的方法
* userId 哪个用户下的订单
*/
public void sendMessage(Long userId){
//实用JMS模板类发送消息
jmsTemplate.convertAndSend(destination,userId);
}
}
在需要发送该消息的业务调用:
//调用发送创建订单消息的方法
createOrderInfoProducer.sendMessage(1L);
在默认情况下消息消费者监听的是Queue消息域的模型
创建订单消息的消费对象
/**
* 创建订单消息的消费对象
*/
//把消息接收者交给spring容器管理
@Component
public class CreateOrderInfoConsumer {
@Autowired
OrderInfoMapper dao;
/**
* 使用spring给我们提供的JMS监听注解,监听一个消息地点
* 注意,监听的消息地点要和消息发送到的地点一致
* @param userId 消息生产者发送过来的userId
*/
@JmsListener(destination = "shop.queue.createOrderInfo")
public void receiveMessage(Long userId) {
//做具体的业务
OrderInfo orderInfo = new OrderInfo();
orderInfo.setUserId(userId);
orderInfo.setOrderSn("222222");
orderInfo.setOrderStatus(Byte.valueOf("0"));
orderInfo.setPayStatus(Byte.valueOf("0"));
orderInfo.setShippingStatus(Byte.valueOf("0"));
dao.insert(orderInfo);
}
}
发送Topic模型的消息
消息生产者:发送Topic模型的消息的关键是创建一个ActiveMQTopic对象,还有把消息发送到的地方修改一下,其余的代码跟Queue模型是一样的
消息消费者:因为在默认情况下消息消费者监听的是Queue消息域的模型,所以想要springboot应用监听Topic消息域模型的消息,就必须修改springboot的默认配置
修改方法,在消息消费者应用的application.properties文件添加一个配置属性:
是否使用订阅和发布的模型监听器,默认为false,也就是说默认使用的是点对点的模型监听器
spring.jms.pub-sub-domain=true
还有把监听的消费接收地方修改一下,其余的代码跟Queue消息域的模型一样
Queue和Topic两种模型同时存在
新建一个配置类:springboot配置类,使ActiveMQ同时支持监听Queue和Topic模型的消息
自定义模型的监听容器bean:
在新建的配置类中添加两个bean,分别用于监听Queue和Topic两个模型的消息,并且注入到spring容器中
/**
* 用于监听Topic模型消息的容器bean
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//是否使用订阅和发布的模型监听器,默认为false,也就是说默认使用的是点对点的模型监听器
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
/**
* 用于监听Queue模型消息的容器bean
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
按需选择监听器容器
现在我们已经定义出了两个监听器的容器,分别用于监听Queue和Topic模型的消息了,现在要做的就是在消息消费者中选择自己需要监听的是哪一种模型
@JmsListener(destination = "shop.topic.createOrderInfo",containerFactory="jmsListenerContainerTopic")
在原本的@JmsListener基础上加一个属性:containerFactory,该属性值就是我们自定义监听器的Bean的方法名