Java消息服务 —ActiveMQ实战
1.JMS—Java消息服务
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS消息服务的规范包含两种消息模式
- 点对点
- 发布者/订阅者
常用的消息中间件
- ActiveMQ
- kafka
- RabbitMQ
- RocketMQ
JMS API
- ConnectionFactory
- Connection
- Session
- Destination
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题 - MessageConsumer
由会话创建的对象,用于接收发送到目标的消息 - MessageProducer
由会话创建的对象,用于发送消息到目标 - Message
是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序
消息的类型
- StreamMessage
Java原始数据流 - MapMessage
键值对 - TextMessage
字符串对象 - ObjectMessage
序列化对象 - ByteMessage
字节数据流
2.windows环境下载安装ActiveMQ
下载界面• 下载 http://activemq.apache.org/download.html
• 解压文件夹,双击 home/bin/winXX/wrapper.exe 进行启动
• 浏览器中访问 http://localhost:8161
• 管理员账号和密码为 admin / admin
apache-activemq-5.15.2解压后的文件
bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 双击 home/bin/winXX/wrapper.exe 进行启动:
启动ActiveMQ
浏览器中访问 http://localhost:8161:
管理员账号和密码为 admin / admin进行登录:
image.png
到这里为止,ActiveMQ 服务端就启动完毕了。
3.创建JMS-ActiveMQ工程
添加maven依赖:
maven依赖
3.1点对点消息
点对点消息在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:
• 只有一个消费者将获得消息
• 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
• 每一个成功处理的消息都由接收者签收
①创建消息生产者
public void producer() throws JMSException {
//1.创建ConnectionFactory 连接工厂
String brokerUrl = "tcp://localhost:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.创建Connection 连接对象
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//3.创建Session事务管理,通过参数设置事务级别
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.创建Destination 目的地对象
Destination destination = session.createQueue("text-Message");
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(destination);
//6.创建一条消息
TextMessage textMessage = session.createTextMessage("测试-消息生产者");
//7.发送消息
messageProducer.send(textMessage);
//提交事务
session.commit();
//8.释放资源
messageProducer.close();
session.close();
connection.close();
}
②创建消息消费者
public void consumer() throws JMSException, IOException {
//1.创建ConnectionFactory
String brokerUrl = "tcp://localhost:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.创建Connection
Connection connection = connectionFactory.createConnection();
connection.start();
//3.创建Session
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.创建Destination 目的地对象
Destination destination = session.createQueue("text-Message");
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
//6.消费消息,监听队列中的消息,若有新消息,会执行onMessage方法
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//避免单元测试停止
System.in.read();
//7.释放资源
messageConsumer.close();
session.close();
connection.close();
}
③运行ActiveMQ项目
生产者运行结果:
消费者运行结果:
消费者
3.2发布/订阅 消息
发布/订阅 消息发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式被概括为:
• 多个消费者可以获得消息
• 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息。
消息发布者:
//1. 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2. 创建连接 并 开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 创建 Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4. 创建 Topic 对象
Topic topic = session.createTopic("weixin-Topic");
//5. 创建生产者
MessageProducer producer = session.createProducer(topic);
//6. 发送消息
TextMessage textMessage = session.createTextMessage("Hello,Topic MQ");
producer.send(textMessage);
//7. 释放资源
producer.close();
session.close();
connection.close();
消息订阅者:
//1. 创建连接工厂
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
//2. 创建并启动连接
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 创建 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地对象
Topic topic = session.createTopic("weixin-Topic");
//5. 创建消费者
MessageConsumer consumer = session.createConsumer(topic);
//6. 获取消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//7. 释放资源
consumer.close();
session.close();
connection.close();