JMS-ActiveMQ-Demo
慕课网Java消息中间件笔记
安装ActiveMQ
下载ActiveMQ并解压至任意目录,如/home/apache-activemq/
运行ActiveMQ
进入到ActiveMQ主目录下的bin目录。
如:/home/apache-activemq/bin
执行如下命令运行:
如图执行成功$ ./activemq start
尝试进入ActiveMQ主页
默认端口:8161
ActiveMQ主页
点击Manage ActiveMQ broker进入管理页面,默认用户名和密码都是admin
管理页面
至此安装成功
Java使用ActiveMQ
新建Maven项目,并引入如下依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
队列模式
-
项目结构
项目结构 - 消息提供者
消息提供者向消息中间件发送消息,需要配置消息服务器的地址和队列名称。
package queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author: WJ
* @Description: 消息提供者 向消息中间件发送消息
* 61616是activemq默认端口
* @Date: Created in 上午10:45 2018/2/6
*/
public class AppProducer {
private static final String URL = "tcp://192.168.58.3:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
//创建Connection
Connection connection = factory.createConnection();
//建立连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个目标
Destination destination = session.createQueue(queueName);
//创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i ++) {
//创建消息
TextMessage message = session.createTextMessage("test" + i);
producer.send(message);
//打印发送的消息
System.out.println("发送消息:" + message.getText());
}
//关闭连接
connection.close();
}
}
运行消息提供者。
进入ActiveMQ后台的Queue选项查看刚才发送的消息。此时看到名为queue-test的队列中有100条消息。
接下来创建消费者消费队列中的消息。
查看消息
- 消息消费者
同样需要指定消息服务器的地址,以及需要消费的队列名称。
package queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author: WJ
* @Description: 消费者
* @Date: Created in 上午11:19 2018/2/6
*/
public class AppConsumer {
private static final String URL = "tcp://192.168.58.3:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
//创建Connection
Connection connection = factory.createConnection();
//建立连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个目标
Destination destination = session.createQueue(queueName);
//创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//创建监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
//获取消息并打印
String text = textMessage.getText();
System.out.println("接收到的消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
/**
* 消息异步接收
* 若在此处关闭连接
* 可能会只接收到部分消息后
* 因连接关闭而不能接收消息队列中的全部消息
*/
// connection.close();
}
}
运行消费者,并观察运行结果。
接收到的消息
可以看到队列中的100条消息已经全部接收到了。
ActiveMQ后台也显示queue-test这个队列中已经没有消息了,并且有一个消费者在线,100个消息出队列。
后台情况
-
运行多个消费者
这次先启动3个消费者,看看三个消费者接收到的消息结果是什么样的。
启动三个消费者
运行提供者,发送100条消息。
消费者1
消费者1
消费者2
消费者2
消费者3
消费者3
可以发现100条消息被依次发送给了三个消费者。
队列模型
主题模式
-
项目结构
项目结构 - 消息发布者
该代码与之前队列模式的发布者十分相似,只是在创建目标的环节由创建队列(createQueue)改为了创建主题(createTopic)。
//创建一个目标
Destination destination = session.createTopic(topicName);
package topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author: WJ
* @Description: 消息提供者 向消息中间件发送消息
* 61616是activemq默认端口
* @Date: Created in 上午10:45 2018/2/6
*/
public class AppProducer {
private static final String URL = "tcp://192.168.58.3:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
//创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
//创建Connection
Connection connection = factory.createConnection();
//建立连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个目标
Destination destination = session.createTopic(topicName);
//创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i ++) {
//创建消息
TextMessage message = session.createTextMessage("test" + i);
producer.send(message);
//打印发送的消息
System.out.println("发送消息:" + message.getText());
}
//关闭连接
connection.close();
}
}
- 创建消费者
同样在创建目标时使用createTopic
Destination destination = session.createTopic(topicName);
package topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author: WJ
* @Description: 消费者
* @Date: Created in 上午11:19 2018/2/6
*/
public class AppConsumer {
private static final String URL = "tcp://192.168.58.3:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
//创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
//创建Connection
Connection connection = factory.createConnection();
//建立连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个目标
Destination destination = session.createTopic(topicName);
//创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//创建监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
//获取消息并打印
String text = textMessage.getText();
System.out.println("接收到的消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
/**
* 消息异步接收
* 若在此处关闭连接
* 可能会只接收到部分消息后
* 因连接关闭而不能接收消息队列中的全部消息
*/
// connection.close();
}
}
-
测试
我们仿照之前的测试步骤,先启动消息提供者。
进入到ActiveMQ后台,进入Topic页面,发现名为topic-test的主题中已经有了100条消息。
ActiveMQ后台
再启动消费者。
此时看到后台中已经显示有一名消费者在线,但并没有消息被消费。
ActiveMQ后台
同时在消费者的控制台中也没有任何信息打印出来,说明确实没有获取到消息。
消费者后台 -
问题原因
该现象产生的原因就是主题模式下,消费者无法接收到在它订阅该主题时刻之前的主题中的消息,只能接收到订阅时刻后主题中的消息。 -
再次测试
先启动消费者订阅topic-test主题,再让生产者提供新的消息。发现成功接收所有消息。
成功接收消息 -
启动多个消费者
这次我们依然启动三个消费者。发现三个消费者都接收到了同样的100条消息。
消费者1
消费者1
消费者2
消费者2
消费者3
消费者3
-
主题模式示意
主题模型
SpringBoot整合ActiveMQ
新建项目
勾选JMS(ActiveMQ)
新建项目
配置ActiveMQ连接
spring:
activemq:
broker-url: tcp://192.168.58.3:61616
close-timeout: 5000
in-memory: false
pool:
max-connections: 100
# enabled: true
send-timeout: 3000
broker-url即访问远程消息服务器的地址,同时需要关闭内存消息服务器(in-memory=false)
注释掉的配置(spring.activemq.pool.enabled)如果配置为true,则需要额外引入如下依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.3</version>
</dependency>
启动类开启JMS支持
在启动类上添加@EnableJMS
注解
队列模式
-
配置队列名称
队列名称 - 创建生产者
这里引入了Spring提供的JmsMessagingTemplate和刚才创建的执行队列消息的对象。并使用jmsMessagingTemplate.convertAndSend()发送消息到消息服务器。
package dev.wj.springbootjmsdemo.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
/**
* @Author: WJ
* @Description:
* @Date: Created in 下午2:44 2018/2/6
*/
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void send(String message) {
System.out.println("发送消息:" + message);
jmsMessagingTemplate.convertAndSend(this.queue, message);
}
}
- 创建消费者
此处使用@JmsListener(destination = "")
注解监听我们的队列。destination属性即要监听的队列名。
package dev.wj.springbootjmsdemo.queue;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* @Author: WJ
* @Description:
* @Date: Created in 下午2:47 2018/2/6
*/
@Component
public class QueueConsumer {
@JmsListener(destination = "spring-queue")
public void receive(String text) {
System.out.println("接收到消息:" + text);
}
}
- 测试
package dev.wj.springbootjmsdemo.queue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author: WJ
* @Description:
* @Date: Created in 下午2:49 2018/2/6
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueueTest {
@Autowired
private QueueProducer queueProducer;
@Test
public void testQueue() {
queueProducer.send("This is SpringBoot JMS Queue");
}
}
控制台结果:
控制台结果
ActiveMQ后台结果
ActiveMQ后台结果
可以看到刚才创建的队列消息已经出现了。
订阅模式
-
配置主题
配置主题 -
创建发布者
package dev.wj.springbootjmsdemo.topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
/**
* @Author: WJ
* @Description:
* @Date: Created in 下午2:52 2018/2/6
*/
@Component
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
public void send(String text) {
System.out.println("topic发送消息:" + text);
jmsMessagingTemplate.convertAndSend(this.topic, text);
}
}
- 创建订阅者
这里需要注意的就是需要为订阅者的监听指定containerFactory才能正确地接收主题中的消息。
package dev.wj.springbootjmsdemo.topic;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
/**
* @Author: WJ
* @Description:
* @Date: Created in 下午2:53 2018/2/6
*/
@Component
public class TopicConsumer {
/**
* 为主题订阅者Listener指定containerFactory
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
@JmsListener(destination = "spring-topic", containerFactory = "jmsListenerContainerTopic")
public void receive(String text) {
System.out.println("topic接收到消息:" + text);
}
}
- 测试
package dev.wj.springbootjmsdemo.topic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author: WJ
* @Description:
* @Date: Created in 下午2:55 2018/2/6
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {
@Autowired
private TopicProducer topicProducer;
@Test
public void testTopic() {
topicProducer.send("This is SpringBoot JMS Topic");
}
}
-
执行结果
控制台:
控制台结果
ActiveMQ后台结果:
ActiveMQ后台结果