ActiveMQ
一、概述
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削峰等问题。
实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、消息队列的应用场景
1、异步处理:用户注册后,需要发送注册邮件和注册短信,传统的做法有两种:串行和并行,引入消息队列后,异步处理,直接返回,提高效率
2、应用解耦:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口,引入消息队列后订单系统将消息写入消息队列后直接返回,库存系统从消息队列中获取订单信息,这样就实现了订单系统与库存系统的解耦。
3、流量削峰:秒杀与团抢活动中,可以在应用前端加入消息队列,可以控制活动的人数,缓解短时间内高流量压垮应用。用户的请求,服务器接收后,首先写入消息队列中。加入消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。
4、日志处理:日志采集客户端,负责日志数据采集,定时写入Kafka队列;Kafka消息队列,负责日志数据的接收,存储和转发;日志处理应用:订阅并消费kafka队列中的日志数据。
5、消息通讯:消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
三、ActiveMQ
1、简介
ActiveMQ出身名门,是Apache门下的最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE1.4规范的JMS Provide实现。它从设计上保证了高性能的集群,当然实现了JMS的P2P与PubSub两种开发模式。
2、安装并运行
去官网http://activemq.apache.org/components/classic/download/下载ActiveMQ的最新版本。解压,进入到bin/win64文件夹下,并启动activemq.bat,在浏览器输入:http://localhost:8161/即可看到ActiveMQ的管理界面。点击:Manage ActiveMQ broker登录管理后台:(用户名、密码默认都为admin)。登录进去以后我们可以点击Queues看到消息队列以及消费者情况等。
4、代码示例
4.1 添加依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qianfeng</groupId>
<artifactId>507activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.12</version>
</dependency>
</dependencies>
</project>
由于不是spring项目,只是一个简单的spring项目,所以只添加了activemq-all这个依赖
4.2 P2P模式
消息队列有两种模式,一种称为p2p,点对点的模式,另一种称为pubsub(publish、subscribe)模式,一对多的模式,这两种模式各有特点。
生产者:
TestProvider.java
package com.qianfeng;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestProvider {
@Test
public void testProvider(){
//使用缺省的用户名、密码和url创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL
);
try {
//通过连接工厂创建连接对象
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//通过连接对象创建会话对象,第一个参数位是否开启事务
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//P2P模式使用的是Queue
Queue queue = session.createQueue("names");
MessageProducer producer = session.createProducer(queue);
sendMessage(session,producer);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
private void sendMessage(Session session, MessageProducer producer) {
try {
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("zhangsan"+i);
producer.send(message);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
TestConsumer.java
package com.qianfeng;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestConsumer {
@Test
public void testConsumer(){
//使用缺省的用户名、密码和url创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL
);
try {
//通过连接工厂创建连接对象
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//通过连接对象创建会话对象,第一个参数为是否开启事务
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("names");
//获取consumer
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage message = (TextMessage) consumer.receive(10000);
if(message != null){
System.out.println(message.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
P2P模式的特点是消费者一定会收到,类似现实中的短信。
4.3 PubSub模式
PubSub模式类似现实中的广播,需要先启动消费者,再启动生产者,消费者才能收到消息。
TestTopicProvider.java
package com.qianfeng;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestTopicProvider {
@Test
public void testTopicProducer(){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
try {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageProducer producer = session.createProducer(topic);
sendMessage(session,producer);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
private void sendMessage(Session session, MessageProducer producer) {
try {
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("zhangsan"+i);
System.out.println("activeMQ发送消息"+i);
Thread.sleep(5000);
producer.send(message);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
TestTopicConsumer.java
package com.qianfeng;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestTopicConsumer {
@Test
public void testTopicConsumer(){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
try {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageConsumer consumer = session.createConsumer(topic);
while (true){
TextMessage message = (TextMessage) consumer.receive(10000);
if(message!=null){
System.out.println("接收到消息:"+message.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}