Spring和ActiveMq消息队列整合详解
Spring和ActiveMq消息队列整合详解
官方主页
一、概述
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)
常见的消息中间件产品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里介绍的是ActiveMQ的使用。
(2)RabbitMQ
AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
Jms
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
所以,ActiveMq是Jms标准的一个实现者。
本篇通过对Mq消息消费、解析并组装发送为例讲述ActiveMq的使用过程。
Git地址:
Gitee
项目地址:
品茗IT-同步发布
品茗IT:提供在线快速构建Spring项目工具和一站式Springboot项目生成。
二、环境配置
2.1 ActiveMq的安装部署
因为消息队列是对生产者消息的存储和处理,并转发给消费者,所以消息队列是独立于生产者和消费者的一个服务。
ActiveMq可以在ActiveMq官网 下载并安装。
选择需要的版本,下载完成后,解压文件(window下用winrar/其他解压工具,linux下tar命令)并进入文件夹的bin目录下:
在这里插入图片描述如图所示:
windows下可以在powershell下用:
./activemq start 或者./activemq.bat start
在cmd下只能用:
activemq.bat start
linux下可能要更改activemq文件的执行权限,然后:
./activemq start
启动后浏览器输入 http://127.0.0.1:8161/ 如图所示:
在这里插入图片描述点击 Manage ActiveMQ broker 跳转到 http://127.0.0.1:8161/admin/; 可以在里面查看消息队列等信息及统计。
2.2 项目建立
本项目将消费者和生产者整合在一起,通过先消费掉mq中的json数据,通过数据中的标识找到对应的解析处理器,解析完成后组装并生产消息到mq的另一个队列中。
2.2.1 maven依赖
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.cff</groupId>
<artifactId>springwork</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>springwork-activemq</artifactId>
<packaging>jar</packaging>
<name>springwork-activemq</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
<build>
<finalName>springwork-activemq</finalName>
</build>
</project>
父模块可以在点击查看获取。
2.2.2 spring-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">
<context:annotation-config />
<context:component-scan base-package="com.cff.springwork">
</context:component-scan>
<bean id="annotationPropertyConfigurerJms"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="order" value="1" />
<property name="ignoreUnresolvablePlaceholders" value="true" />
<property name="locations">
<list>
<value>classpath:jms.properties</value>
</list>
</property>
</bean>
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${destQueueName}" />
</bean>
<bean id="listenQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${listenQueueName}" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="defaultQueueDestination" />
</bean>
<bean id="mqMessageListener" class="com.cff.springwork.activemq.handler.MqMessageConsumer">
<property name="handler" ref="jsonParseHander" />
</bean>
<bean id="jsonParse1001" class="com.cff.springwork.activemq.parser.JsonParse1001">
</bean>
<bean id="jsonParseHander" class="com.cff.springwork.activemq.handler.bussiness.JsonParseHander">
<property name="nextHandler" ref="bussiNessHander" />
<property name="parsers">
<map>
<entry key="1001" value-ref="jsonParse1001" />
</map>
</property>
</bean>
<bean id="bussiNessHander" class="com.cff.springwork.activemq.handler.bussiness.BussiNessHander">
<property name="nextHandler" ref="mqMessageProducer" />
</bean>
<bean id="mqMessageProducer" class="com.cff.springwork.activemq.handler.MqMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<!-- 消息监听容器 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrency" value="5-10"></property>
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="listenQueueDestination" />
<property name="messageListener" ref="mqMessageListener" />
<property name="sessionTransacted" value="true"/>
</bean>
</beans>
这里面的配置可以分为连接、队列、发送、监听、容器、业务bean这几类:
-
连接:targetConnectionFactory和connectionFactory定义了连接工厂,connectionFactory只是spring对activemq工厂的封装。
-
队列:defaultQueueDestination和listenQueueDestination定义了一个发送目的队列和一个监听消息队列,如果只做发送或者只做监听,定义一个即可。
-
发送:jmsTemplate是spring对消息队列发送方式的封装,类似于JdbcTemplate、RedisTemplate、RestTemplate等封装类。mqMessageProducer是项目自己封装了调用jmsTemplate发送消息的一个bean。
-
监听:mqMessageListener是项目封装的监听消息的bean。
-
容器:jmsContainer是将消息队列和监听bean整合起来,这样就保证用的时候能找到对应的bean。
-
业务:jsonParse1001、jsonParseHander和bussiNessHander都是模拟业务逻辑的,jsonParse1001、jsonParseHander是定义对json数据的处理,bussiNessHander是模拟对业务逻辑的处理。这里使用责任链模式。
jms.properties配置文件:
brokerURL=tcp://localhost:61616
destQueueName=destQueue
listenQueueName=listenQueue
2.2.3 监听器(消费者)
前面我们定义了mqMessageListener监听器,其实就是消费者,用以接收监听队列中的消息。
MqMessageConsumer :
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import com.cff.springwork.activemq.handler.bussiness.Handler;
import com.cff.springwork.activemq.model.JmsMessage;
public class MqMessageConsumer implements MessageListener{
private Handler handler;
public void onMessage(Message message) {
TextMessage textMsg = (TextMessage) message;
System.out.println("接收到一个纯文本消息。");
try {
System.out.println("消息内容是:" + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
JmsMessage<String> jm = new JmsMessage<String>();
try {
jm.setBody(textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
handler.hander(jm);
}
public Handler getHandler() {
return handler;
}
public void setHandler(Handler handler) {
this.handler = handler;
}
}
2.2.4 发送方(生产者)
前面定以的bean中,mqMessageProducer是负责发送消息的,即生产消息的,发送消息放在发送的目的消息队列中,队列是如何确定的呢?在我们定义的容器bean中。
MqMessageProducer:
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.cff.springwork.activemq.model.JmsMessage;
import net.sf.json.JSONObject;
public class MqMessageProducer implements Handler{
private JmsTemplate jmsTemplate;
public static String RETCODESUCCESS="200";
public void hander(JmsMessage msg) {
JSONObject jSONObject = new JSONObject();
jSONObject.put("msgType", msg.getMsgType());
jSONObject.put("orderNo", msg.getOrderNo());
jSONObject.put("retCode", RETCODESUCCESS);
System.out.println("准备发送jms到["+jmsTemplate.getDefaultDestination()+"]...");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textmsg = session.createTextMessage();
textmsg.setText(jSONObject.toString());
return textmsg;
}
});
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
2.2.5 业务逻辑
我们让业务逻辑的处理器都实现Handler接口:
Handler:
import com.cff.springwork.activemq.model.JmsMessage;
public interface Handler {
public void hander(JmsMessage msg);
}
监听器首先调用的是JsonParseHander,对Json数据进行解析。
JsonParseHander:
import java.util.Map;
import com.cff.springwork.activemq.model.JmsMessage;
import com.cff.springwork.activemq.parser.JsonParser;
import net.sf.json.JSONObject;
public class JsonParseHander implements Handler{
private Handler nextHandler;
private Map<String,JsonParser> parsers;
@Override
public void hander(JmsMessage msg) {
String body = (String) msg.getBody();
JSONObject json = JSONObject.fromObject(body);
String msgType = json.getString("msgType");
JsonParser jp = parsers.get(msgType);
System.out.println("消息类型为:" + msgType + "处理类:" + jp.getClass().getName());
JmsMessage jm = jp.parse(body);
System.out.println("消息为:" + jm.toString() );
if(nextHandler!=null){
System.out.println("nextHandler为:" + nextHandler.getClass().getName());
nextHandler.hander(jm);
}else{
System.out.println("nextHandler为空");
}
}
public Map<String, JsonParser> getParsers() {
return parsers;
}
public void setParsers(Map<String, JsonParser> parsers) {
this.parsers = parsers;
}
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
}
在spring的配置文件中,我们配置的责任链的下一个Handler是BussiNessHander。
BussiNessHander:
import com.cff.springwork.activemq.model.JmsMessage;
public class BussiNessHander implements Handler{
private Handler nextHandler;
@Override
public void hander(JmsMessage msg) {
System.out.println(msg.getBody().toString());
if(nextHandler!=null){
nextHandler.hander(msg);
}
}
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
}
业务逻辑处理完成后,我们将消息发送到另一个队列上,这时候,我们可以将mqMessageProducer实现Handler接口,这样我们的责任链的下一个就是mqMessageProducer(2.2.4定义的发送方)。
2.2.5 Json数据处理
因为本篇是对消息队列监听,假设传输的业务数据是Json,而且有多种业务,那样每个业务的Json数据格式都不一样,比如我们的JsonParseHander是根据Json数据中的msgType,拿到JsonParseHander中的注入的map中jsonParse${msgType}对应的bean。
这里我们先定义一个接口JsonParser:
JsonParser:
import com.cff.springwork.activemq.model.JmsMessage;
public interface JsonParser {
public JmsMessage parse(String msg);
}
然后定义一个jsonParser,JsonParse1001。
详细完整代码(实体类和Json处理实现类)可以在https://www.pomit.cn/p/178648639553792查看
快速构建项目
喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!
品茗IT交流群