Spring之路

Spring和ActiveMq消息队列整合详解

2019-05-21  本文已影响2人  逍遥天扬

Spring和ActiveMq消息队列整合详解

官方主页

Spring

ActiveMq

一、概述

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里介绍的是ActiveMQ的使用。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

Jms

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

所以,ActiveMq是Jms标准的一个实现者。

本篇通过对Mq消息消费、解析并组装发送为例讲述ActiveMq的使用过程。

Git地址:
Gitee

项目地址:
品茗IT-同步发布

品茗IT:提供在线快速构建Spring项目工具一站式Springboot项目生成

二、环境配置

2.1 ActiveMq的安装部署

因为消息队列是对生产者消息的存储和处理,并转发给消费者,所以消息队列是独立于生产者和消费者的一个服务。

ActiveMq可以在ActiveMq官网 下载并安装。

选择需要的版本,下载完成后,解压文件(window下用winrar/其他解压工具,linux下tar命令)并进入文件夹的bin目录下:

在这里插入图片描述

如图所示:

windows下可以在powershell下用:

./activemq start 或者./activemq.bat start

在cmd下只能用:

activemq.bat start

linux下可能要更改activemq文件的执行权限,然后:

./activemq start

启动后浏览器输入 http://127.0.0.1:8161/ 如图所示:

在这里插入图片描述

点击 Manage ActiveMQ broker 跳转到 http://127.0.0.1:8161/admin/; 可以在里面查看消息队列等信息及统计。

2.2 项目建立

本项目将消费者和生产者整合在一起,通过先消费掉mq中的json数据,通过数据中的标识找到对应的解析处理器,解析完成后组装并生产消息到mq的另一个队列中。

2.2.1 maven依赖

<?xml version="1.0"?>
<project
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.cff</groupId>
        <artifactId>springwork</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>springwork-activemq</artifactId>
    <packaging>jar</packaging>
    <name>springwork-activemq</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>springwork-activemq</finalName>
    </build>
</project>

父模块可以在点击查看获取。

2.2.2 spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws"
    xsi:schemaLocation="
                    http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                    http://www.springframework.org/schema/tx 
                    http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
                    http://www.springframework.org/schema/aop 
                    http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                    http://www.springframework.org/schema/context      
                    http://www.springframework.org/schema/context/spring-context-4.0.xsd
                    http://www.springframework.org/schema/cache 
                    http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                    http://www.springframework.org/schema/jms 
                    http://www.springframework.org/schema/jms/spring-jms-4.0.xsd   
                    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">

    <context:annotation-config />
    <context:component-scan base-package="com.cff.springwork">
    </context:component-scan>
    
    <bean id="annotationPropertyConfigurerJms"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="order" value="1" />
        <property name="ignoreUnresolvablePlaceholders" value="true" />
        <property name="locations">
            <list>
                <value>classpath:jms.properties</value>
            </list>
        </property>
    </bean>
    
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${brokerURL}" />
    </bean>
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${destQueueName}" />
    </bean>
    <bean id="listenQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${listenQueueName}" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="defaultQueueDestination" />
    </bean>

    <bean id="mqMessageListener" class="com.cff.springwork.activemq.handler.MqMessageConsumer">
        <property name="handler" ref="jsonParseHander" />
    </bean>
    
    <bean id="jsonParse1001" class="com.cff.springwork.activemq.parser.JsonParse1001">
    </bean>

    <bean id="jsonParseHander" class="com.cff.springwork.activemq.handler.bussiness.JsonParseHander">
        <property name="nextHandler" ref="bussiNessHander" />
        <property name="parsers">
            <map>
                <entry key="1001" value-ref="jsonParse1001" />
            </map>
        </property>
    </bean>
    
    <bean id="bussiNessHander" class="com.cff.springwork.activemq.handler.bussiness.BussiNessHander">
        <property name="nextHandler" ref="mqMessageProducer" />
    </bean>
    
    <bean id="mqMessageProducer" class="com.cff.springwork.activemq.handler.MqMessageProducer">
        <property name="jmsTemplate" ref="jmsTemplate" />
    </bean>
    <!-- 消息监听容器 -->
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrency" value="5-10"></property>
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="listenQueueDestination" />
        <property name="messageListener" ref="mqMessageListener" />
        <property name="sessionTransacted" value="true"/>  
    </bean>

    

</beans>

这里面的配置可以分为连接、队列、发送、监听、容器、业务bean这几类:

jms.properties配置文件:

brokerURL=tcp://localhost:61616
destQueueName=destQueue
listenQueueName=listenQueue

2.2.3 监听器(消费者)

前面我们定义了mqMessageListener监听器,其实就是消费者,用以接收监听队列中的消息。

MqMessageConsumer :

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import com.cff.springwork.activemq.handler.bussiness.Handler;
import com.cff.springwork.activemq.model.JmsMessage;

public class MqMessageConsumer implements MessageListener{
    private Handler handler;
    public void onMessage(Message message) {
        TextMessage textMsg = (TextMessage) message;   
        System.out.println("接收到一个纯文本消息。");   
        try {   
            System.out.println("消息内容是:" + textMsg.getText());   
        } catch (JMSException e) {   
            e.printStackTrace();   
        }   
        JmsMessage<String> jm = new JmsMessage<String>();
        try {
            jm.setBody(textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        handler.hander(jm);
        
    }
    public Handler getHandler() {
        return handler;
    }
    public void setHandler(Handler handler) {
        this.handler = handler;
    }
    
} 

2.2.4 发送方(生产者)

前面定以的bean中,mqMessageProducer是负责发送消息的,即生产消息的,发送消息放在发送的目的消息队列中,队列是如何确定的呢?在我们定义的容器bean中。

MqMessageProducer:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.cff.springwork.activemq.model.JmsMessage;

import net.sf.json.JSONObject;

public class MqMessageProducer implements Handler{
    private JmsTemplate jmsTemplate;
    public static String RETCODESUCCESS="200";
    
    public void hander(JmsMessage msg) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("msgType", msg.getMsgType());
        jSONObject.put("orderNo", msg.getOrderNo());
        jSONObject.put("retCode", RETCODESUCCESS);
        
        System.out.println("准备发送jms到["+jmsTemplate.getDefaultDestination()+"]...");             

        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage textmsg = session.createTextMessage();  
                textmsg.setText(jSONObject.toString());
                return textmsg;
            }
        });
        
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

2.2.5 业务逻辑

我们让业务逻辑的处理器都实现Handler接口:

Handler:

import com.cff.springwork.activemq.model.JmsMessage;

public interface Handler {
    public void hander(JmsMessage msg);
}

监听器首先调用的是JsonParseHander,对Json数据进行解析。

JsonParseHander:

import java.util.Map;

import com.cff.springwork.activemq.model.JmsMessage;
import com.cff.springwork.activemq.parser.JsonParser;

import net.sf.json.JSONObject;

public class JsonParseHander implements Handler{
    private Handler nextHandler;
    private Map<String,JsonParser> parsers;
    
    @Override
    public void hander(JmsMessage msg) {
        String body = (String) msg.getBody();
        JSONObject json = JSONObject.fromObject(body);
        String msgType = json.getString("msgType");
        JsonParser jp = parsers.get(msgType);
        System.out.println("消息类型为:" + msgType + "处理类:" + jp.getClass().getName());   

        JmsMessage jm = jp.parse(body);
        System.out.println("消息为:" + jm.toString() );  
        if(nextHandler!=null){
            System.out.println("nextHandler为:" + nextHandler.getClass().getName());  
            nextHandler.hander(jm);
        }else{
            System.out.println("nextHandler为空");  
        }
    }

    public Map<String, JsonParser> getParsers() {
        return parsers;
    }

    public void setParsers(Map<String, JsonParser> parsers) {
        this.parsers = parsers;
    }

    public Handler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(Handler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

在spring的配置文件中,我们配置的责任链的下一个Handler是BussiNessHander。

BussiNessHander:

import com.cff.springwork.activemq.model.JmsMessage;

public class BussiNessHander implements Handler{
    private Handler nextHandler;
    
    @Override
    public void hander(JmsMessage msg) {
        System.out.println(msg.getBody().toString());
        if(nextHandler!=null){
            nextHandler.hander(msg);
        }
    }
    

    public Handler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(Handler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

业务逻辑处理完成后,我们将消息发送到另一个队列上,这时候,我们可以将mqMessageProducer实现Handler接口,这样我们的责任链的下一个就是mqMessageProducer(2.2.4定义的发送方)。

2.2.5 Json数据处理

因为本篇是对消息队列监听,假设传输的业务数据是Json,而且有多种业务,那样每个业务的Json数据格式都不一样,比如我们的JsonParseHander是根据Json数据中的msgType,拿到JsonParseHander中的注入的map中jsonParse${msgType}对应的bean。

这里我们先定义一个接口JsonParser:

JsonParser:

import com.cff.springwork.activemq.model.JmsMessage;

public interface JsonParser {
    public JmsMessage parse(String msg);
}

然后定义一个jsonParser,JsonParse1001。

详细完整代码(实体类和Json处理实现类)可以在https://www.pomit.cn/p/178648639553792查看

快速构建项目

Spring组件化构建

喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!


品茗IT交流群
上一篇下一篇

猜你喜欢

热点阅读