第十五篇:使用ActiveMQ
前言:
前面我们可以利用Solr集群来实现我们的搜索功能,但是有没有发现在我们每次添加一个新的商品的时候都要重新导入一次索引库,效率非常低。这就需要我们优化一下我们的方案。我们想到最好就是我们添加商品的时候可以单独将该商品同步到索引库。那么我们可以想象有以下几种方案:
方案一:在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑。
缺点:业务逻辑耦合度高,业务拆分不明确
方案二:业务逻辑在taotao-search中实现,调用服务在taotao-manager实现。业务逻辑分开。
缺点:服务之间的耦合度变高。服务的启动有先后顺序。
方案三:使用消息队列。MQ是一个消息中间件。如图
image.png
怎么理解消息中间件呢?我们可以把它理解为一个秘书,消息的发布者就是大老板,大老板下午三点要开个会,他只需跟秘书说一声,下午三点,我要开个会,就行了,老板不用管秘书是怎样通知各项目经理的,也不用管项目经理要带什么材料,他所做的只是告诉秘书一声而已。秘书负责与各个项目经理联系,告诉各个项目经理应该准备什么。MQ便相当于"秘书"这个角色。当添加一个商品时,商品服务只需要告诉消息中间件MQ,MQ便去通知其它服务做各自该做的事情,比如通知搜索服务去同步索引库,通知redis服务去同步缓存,通知生成静态页面等等。
常见的作为MQ中间件的有:ActiveMQ、RabbitMQ、Kafka。
1.什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
2.ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。我们用的最多的就是TextMessage而已。
- StreamMessage -- Java原始值的数据流
- MapMessage--一套名称-值对
- TextMessage--一个字符串对象
- ObjectMessage--一个序列化的 Java对象
-
BytesMessage--一个字节的数据流
image.png
3.ActiveMQ的安装
1.进入http://activemq.apache.org/下载ActiveMQ
使用的版本是5.12.0
2.安装环境
1.需要jdk
2.Linux虚拟机
3.安装步骤
第一步:先把ActiveMQ的压缩包上传到Linux系统
第二步:解压
第三步:启动ActiveMQ
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
4.进入管理后台:
(http://192.168.208.40:8161/admin)
用户名:admin
密码:admin
有时候我们可能会遇到503错误的问题
解决方法:
1、查看机器名
[root@itcast168 bin]# cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=itcast168
2、修改host文件
[root@itcast168 bin]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 itcast168
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
[root@itcast168 bin]#
3、重启Activemq服务
4.测试ActiveMQ
4.1 Queue
4.1.1生产者:生产消息,发送端
1.把jar包添加到依赖中,使用5.11.2版本的包(为什么用这个版本的包?因为5.12的话里面有spring的集成,会影响我们本来的架构)
<!--ActivqMq组件-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
//测试消息队列的发送者
@Test
public void testActiveMqProducer() throws Exception{
//1.创建一个连接的工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.208.40:61616");
//2.使用ConnectionFactory创建一个Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接
connection.start();
//4.使用Connection对象创建一个Session对象
//第一个参数:是否开启事务(分布式事务)。true:开启事务,第二个参数忽略。
//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,(topic、queue),此处创建一个Queue对象
//topic的话就是
// Destination topic = session.createTopic();
Destination queue = session.createQueue("test-queue");
//6.使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
//7.创建一个Message对象,创建一个TextMessage对象
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first 1");
//8.使用生产者发送信息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
4.1.2 消费者:接收信息
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
//测试消息的接受者
@Test
public void testQueueConsumer() throws Exception{
//1.创建一个连接的工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.208.40:61616");
//2.使用ConnectionFactory创建一个Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接
connection.start();
//4.使用Connection对象创建一个Session对象
//第一个参数:是否开启事务(分布式事务)。true:开启事务,第二个参数忽略。
//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,(topic、queue),此处创建一个Queue对象
Destination queue = session.createQueue("test-queue");
//6.使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
image.png
运行生产者
image.png
运行消费者
image.png
我们可以观察到数目发生了变化
image.png
4.2 Topic
4.2.1 Producer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
@Test
public void testTopicProducer() throws JMSException{
//1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口号。注意参数brokerURL的开头是
//tcp://而不是我们通常的http://,端口是61616而不是我们访问activemq后台管理页面所使用的8161
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.208.40:61616");
//2.使用ConnectionFactory创建一个连接Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接。调用Connection对象的start方法
connection.start();
//4.使用Connection对象创建一个Session对象
//第一个参数是是否开启事务,一般不使用分布式事务,因为它特别消耗性能,而且顾客体验特别差,现在互联网的
//做法是保证数据的最终一致(也就是允许暂时数据不一致),比如顾客下单购买东西,一旦订单生成完就立刻响应给用户
//下单成功。至于下单后一系列的操作,比如通知会计记账、通知物流发货、商品数量同步等等都先不用管,只需要
//发送一条消息到消息队列,消息队列来告知各模块进行相应的操作,一次告知不行就两次,直到完成所有相关操作为止,这
//也就做到了数据的最终一致性。如果第一个参数为true,那么第二个参数将会被忽略掉。如果第一个参数为false,那么
//第二个参数为消息的应答模式,常见的有手动和自动两种模式,我们一般使用自动模式。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在我们使用topic
//参数就是消息队列的名称
Topic topic = session.createTopic("test-topic");
//6.使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(topic);
//7.创建一个TextMessage对象
//有两种方式,第一种方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二种方式:
TextMessage textMessage = session.createTextMessage("hello,activemq topic");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
4.2.2 消费者
@Test
public void testTopicConsumer() throws Exception{
//1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口号。注意参数brokerURL的开头是
//tcp://而不是我们通常的http://,端口是61616而不是我们访问activemq后台管理页面所使用的8161
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.30:61616");
//2.使用ConnectionFactory创建一个连接Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接。调用Connection对象的start方法
connection.start();
//4.使用Connection对象创建一个Session对象
//第一个参数是是否开启事务,一般不使用分布式事务,因为它特别消耗性能,而且顾客体验特别差,现在互联网的
//做法是保证数据的最终一致(也就是允许暂时数据不一致),比如顾客下单购买东西,一旦订单生成完就立刻响应给用户
//下单成功。至于下单后一系列的操作,比如通知会计记账、通知物流发货、商品数量同步等等都先不用管,只需要
//发送一条消息到消息队列,消息队列来告知各模块进行相应的操作,一次告知不行就两次,直到完成所有相关操作为止,这
//也就做到了数据的最终一致性。如果第一个参数为true,那么第二个参数将会被忽略掉。如果第一个参数为false,那么
//第二个参数为消息的应答模式,常见的有手动和自动两种模式,我们一般使用自动模式。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在我们使用queue
//参数就是消息队列的名称
Topic topic = session.createTopic("test-topic");
//6.使用Session对象创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(topic);
//7.向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//8.程序等待接收用户结束操作
//程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,
//当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。
System.out.println("topic消费者1111。。。。。");
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
topic形式的话会存在一个问题就是不能持久化,也就是如果消息发送者发送消息的时候,如果没有消费者运行的话,它将无法消费者条消息。(即就算消费者没有及时消费的话,就算再启动也无法获得生产者传过来的消息)。如何解决的话我们可以考虑将数据保存在磁盘这样就不会丢失了。