记录一次ActiveMQ的学习记录

2022-04-22  本文已影响0人  曹大大

ActiveMQ入门

异步处理
应用解耦
流量削峰

异步处理

场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。


串行方式

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

串行方式.png

并行方式

将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个以上三个任务完成后,返回给客户端,与串行的差别是,并行的方式可以提高处理的效率。

并行处理.png

异步处理

引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:

引入消息中间件.png

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了3倍,比并行提高了两倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。

传统的做法是,订单系统调用库存系统的接口。如下图:

传统做法.png

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:

消息队列.png

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

流量消峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。通过加入消息队列完成如下功能:
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用

秒杀业务.png

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理

常见的消息中间件产品对比

特性 ActiveMQ RabbitMq RocketMQ Kafka
开发语言 Java Erlang Java Scala
单击吞吐量 万级 万级 10万级 10万级
实效性 毫秒级 微秒级 毫秒级 毫秒级
可用性 高(支持主从架构) 高(支持主从架构) 非常高(分布式架构) 非常高(分布式架构)
功能性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持的较好 基于erlang开发,所以并发能力很强,性能及其好,延时很低,管理界面丰富 MQ功能比较完备,扩展性佳 像一些消息查询,消息回溯等功能没有提供,在大数据领域应用广泛

什么是ActiveMQ?

官网: http://activemq.apache.org/

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

什么是JMS?

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于JDBC(java DatabaseConnectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。

JMS消息模式

消息中间件一般有两种传递模式:点对点模式(P2P)和发布订阅模式(Pub/Sub)。

(1)P2P(Point to Point)点对点模型(Queue队列模型)

(2)Publish/Subscribe(PUB/SUB)发布、定于模型(Topic主题模型)

点对点模型(Pointer-to-Pointer):即生产者和消费者之间的消息往来。每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

点对点模式.png

点对点模型的特点:

发布/订阅(Publish-Subscribe)

包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

发布和订阅.png

发布/订阅模型的特点:

原生JMS API操作ActiveMQ

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
        </dependency>
    </dependencies>

步骤

1.创建连接工厂 2.创建连接 3.打开连接 4.创建session 5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息) 6.创建消息生产者 7.创建消息 8.发送消息 9.释放资源

消息的发送者 点对点

package com.czy.producer;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import javax.management.Query;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.producer
 * @ClassName: PTPT_Producer
 * @Author: 曹振远
 * @Description: 点对点模式--消息生产者
 * @Date: 2021/6/10 16:50
 * @Version: 1.0
 */
public class PTPT_Producer {

    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.创建连接
        Connection connection = factory.createConnection();
        //3.打开连接
        connection.start();
        //4.创建Session 参数1:是否开启事务  参数二:消息确认机制
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建目标地址(Queue/Topic)
        Queue queue = session.createQueue("queue01");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息
        TextMessage testMessage = session.createTextMessage("test Message");
        //8.发送消息
        producer.send(testMessage);
        System.out.println("发送消息成功");
        //9.释放资源
        session.close();
        connection.close();
    }
}

消息消费者 点对点

package com.czy.consumer;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.consumer
 * @ClassName: PTP_Consumer
 * @Author: 曹振远
 * @Description: 点对点模式--消息消费者
 * @Date: 2021/6/11 10:39
 * @Version: 1.0
 */
public class PTP_Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("queue01");

        MessageConsumer consumer = session.createConsumer(queue);

        while (true) {
            Message message = consumer.receive();
            if (message == null) {
                break;
            }else{
                //如果还有消息,判断是什么类型
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("接受到的消息:"+textMessage);
                }
            }
        }

    }
}

消费者2 --监听器

package com.czy.consumer;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.consumer
 * @ClassName: PTP_Consumer
 * @Author: 曹振远
 * @Description: 点对点模式--消息消费者监听模式
 * @Date: 2021/6/11 10:39
 * @Version: 1.0
 */
public class PTP_Consumer2 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("queue01");

        MessageConsumer consumer = session.createConsumer(queue);

        //设置消息监听器
        consumer.setMessageListener(new MessageListener() {
            //处理逻辑
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println(textMessage);
                }
            }
        });

    }
}

发布订阅消息生产者

package com.czy.producer;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.producer
 * @ClassName: PTPT_Producer
 * @Author: 曹振远
 * @Description: 发布订阅模式--消息生产者
 * @Date: 2021/6/10 16:50
 * @Version: 1.0
 */
public class PS_Producer {

    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.创建连接
        Connection connection = factory.createConnection();
        //3.打开连接
        connection.start();
        //4.创建Session 参数1:是否开启事务  参数二:消息确认机制
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建目标地址(Queue/Topic)
        Topic topic = session.createTopic("topic01");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息
        TextMessage testMessage = session.createTextMessage("test Message-topic");
        //8.发送消息
        producer.send(testMessage);
        System.out.println("发送消息成功");
        //9.释放资源
        session.close();
        connection.close();
    }
}

发布订阅消息消费者

package com.czy.consumer;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.consumer
 * @ClassName: PTP_Consumer
 * @Author: 曹振远
 * @Description: 发布订阅模式--消息消费者监听模式
 * @Date: 2021/6/11 10:39
 * @Version: 1.0
 */
public class PS_Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("topic01");

        MessageConsumer consumer = session.createConsumer(topic);

        //设置消息监听器
        consumer.setMessageListener(new MessageListener() {
            //处理逻辑
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println(textMessage);
                }
            }
        });

    }
}

spring整合ActiveMq

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itheima</groupId>
    <artifactId>spring_producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
        </dependency>


        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.7</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
</project>

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!--1.创建连接工厂对象-->
    <amp:connectionFactory
            id="connetionFactory"
            brokerURL="tcp://127.0.0.1:61616"
            userName="admin"
            password="admin"
    />

    <!--2.创建缓存连接工厂-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--注入连接工厂-->
        <property name="targetConnectionFactory" ref="connetionFactory"/>
        <!--缓存消息数据-->
        <property name="sessionCacheSize" value="5"/>
    </bean>

    <!--3.创建用于点对点发送的JmsTemplate-->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--注入缓存连接工厂-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--指定是否为发布订阅模式-->
        <property name="pubSubDomain" value="false"/>
    </bean>

    <!--4.创建用于发布订阅发送的JmsTemplate-->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--注入缓存连接工厂-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--指定是否为发布订阅模式-->
        <property name="pubSubDomain" value="true"/>
    </bean>
</beans>
package com.czy.producer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.producer
 * @ClassName: SpringProducer
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 14:11
 * @Version: 1.0
 */
@RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
@ContextConfiguration("classpath:applicationContext.xml") // 加载spring配置文件
public class SpringProducer {

    //点对点模式
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsQueueTemplate;

    //发布订阅模式
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTopicTemplate;

    /**
     * 点对点的发送
     * 第一个参数是:指定队列名称
     * 第二个参数是:MessageCreator接口
     */
    @Test
    public void ptpSends(){
        jmsQueueTemplate.send("spring_Queue", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("spring text Message");
            }
        });
        System.out.println("发送消息成功");
    }

    @Test
    public void psSends(){
        jmsTopicTemplate.send("topic01", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("spring text Message");
            }
        });
    }
}

消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

<!--1.创建连接工厂对象-->
    <amp:connectionFactory
            id="connetionFactory"
            brokerURL="tcp://127.0.0.1:61616"
            userName="admin"
            password="admin"
    />

    <!--2.创建缓存连接工厂-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--注入连接工厂-->
        <property name="targetConnectionFactory" ref="connetionFactory"/>
        <!--缓存消息数据-->
        <property name="sessionCacheSize" value="5"/>
    </bean>

    <!--3.配置监听扫描-->
    <context:component-scan base-package="com.czy.listener"/>

    <!--4.配置监听器(点对点)-->
    <jms:listener-container connection-factory="cachingConnectionFactory"
    destination-type="queue">
        <jms:listener destination="spring_Queue" ref="queueListener"/>
    </jms:listener-container>

    <!--5.配置监听器(发布订阅)-->
    <jms:listener-container connection-factory="cachingConnectionFactory"
                            destination-type="topic">
        <jms:listener destination="spring_Topic01" ref="topicListener"/>
    </jms:listener-container>
</beans>
package com.czy.listener;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.listener
 * @ClassName: QueueListener
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:09
 * @Version: 1.0
 */
@Component
public class QueueListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("Queue:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

package com.czy.listener;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.listener
 * @ClassName: TopicListener
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:14
 * @Version: 1.0
 */
@Component
public class TopicListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("topic:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

启动类

package com.czy.consumer;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;

import java.io.IOException;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.consumer
 * @ClassName: SpringConsumer
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:16
 * @Version: 1.0
 */
public class SpringConsumer {
    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");

        cxt.start();

        System.in.read();
    }
}

springboot整合activeMQ

生产者

 <!--springboot父工程:锁定springboot的版本及其整合框架的版本-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/>
    </parent>

    <!--导入所需依赖-->
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--springboot与ActiveMQ的整合依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

application.yml

server:
  port: 9001

spring:
  application:
    name: activeMQ-producer

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin

   #指定发布模式,false是点对点,true是发布订阅
  jms:
    pub-sub-domain: false

test

package com.czy.producer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.producer
 * @ClassName: SpringBootProducer
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:45
 * @Version: 1.0
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Test
    public void ptpSends(){
        jmsMessagingTemplate.convertAndSend("springboot-queue","你好mq");
        System.out.println("发送消息成功");
    }
}

消费者

server:
  port: 9002

spring:
  application:
    name: activeMq-Consumer

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin

  jms:
    pub-sub-domain: false
package com.czy.listener;

import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;


import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.listener
 * @ClassName: QueueListener
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:56
 * @Version: 1.0
 */
@Component
public class QueueListener {

    @JmsListener(destination = "springboot-queue")
    public void receiveMessage(Message message) throws JMSException {
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            System.out.println(textMessage.getText());
        }
    }
}

ActiveMQ高级

JMS消息组成

结构 描述
JMS Provider 消息中间件、消息服务器
JMS Producer 消息生产者
JMS Consumer 消息消费者
JMS Message 消息(重要)

JMS Message消息由三部分组成

JMS消息头

JMS消息头预定了若干字段用户用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

名称 描述
<span style="color:red">JMSDestination</span> 消息发送的Destination,在发送过程中由提供者设置,发送到哪里(队列)
<span style="color:red">JMSMessageID</span> 唯一标识提供者发送的每一个消息,这个字段实在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的MessageID
<span style="color:red">JMSDeliveryMode</span> 消息持久化。包含DeliveryMode.PERSISTENT或者DeliveryMode.NON_PERSISTENT
JMSTimestamp 提供者发送消息的时间,由提供者在发送过程中设置
<span style="color:red">JMSExpiration</span> 消息失效时间,毫秒,值0表明消息不会过期,默认是0
<span style="color:red">JMSPriority</span> 消息的优先级,由提供者在发送过程中设置。优先级0的优先级最低,优先级9的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认是4
<span style="color:red">JMSCorrelationID</span> 通常用来链接响应消息与请求消息,由消息的JJMS程序设置
JMSSReplyTo 请求程序用它来指出回复消息应发送的地方,由发送消息的JMS程序设置
JMSType JMS程序用来指出消息的类型
JMSRedelivered 消息的重发标志,false,代表该消息时第一次发生,true,代表消息

不过需要注意的是,在传送消息时,消息头的值是由JMS提供者来设置的,因此开发者使用以上setJMSxxx()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:

JMSCorrelationID、JMSRepltTo、JMSType


JMS消息体

在消息体中,JMS API定义了五种类型的消息格式

TextMessage--一个字符串对象
MapMessage --键值对
ObjectMessage --一个序列化的Java对象,注意对象必须序列化,5.12后需要添加信任列表
BytesMessage --一个字节的数据流
StreamMessage --Java原始值的数据流
spring:
  application:
    name: activeMq-Consumer

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    packages:
      trust-all: true #让ActiveMQ信任全部的自定义对象,实现对象的序列化

JMS消息属性

我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的,对于实现消息过滤功能和标记功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择的提供部分标准属性。

message.setStringPropertie("Property",property);//自定义属性

消息持久化

消息持久化是保证消息不丢失的重要方法!!!

ActiveMQ提供了以下三种的消息存储模式:

  1. Memory消息存储-基于内存的存储方式。
  2. 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它是提供了容量的提升和恢复能力。
  3. 基于JDBC的消息存储方式--数据存储于数据库(例如MySQL中)。
ActiveMQ持久化.png ActiveMQ持久化-消费者.png

为什么要移除呢?

因为持久化不移除的话,有可能会重复发送消息。

不持久化

server:
  port: 9001

spring:
  application:
    name: activeMQ-producer

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin

   #指定发布模式,false是点对点,true是发布订阅
  jms:
    pub-sub-domain: false
    template:
      delivery-mode: non_persistent #非持久化(把消息存储在内存里面)

kahaDB日志存储持久化

server:
  port: 9001

spring:
  application:
    name: activeMQ-producer

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin

   #指定发布模式,false是点对点,true是发布订阅
  jms:
    pub-sub-domain: false
    template:
      delivery-mode: persistent #存储在日志文件里面

JDBC持久化

server:
  port: 9001

spring:
  application:
    name: activeMQ-producer

  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin

   #指定发布模式,false是点对点,true是发布订阅
  jms:
    pub-sub-domain: false
    template:
      delivery-mode: persistent #存储在日志文件里面

修改activemq.xml

<!--配置数据库连接池--> 
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://192.168.66.133:3306/db_activemq" /> <property name="username" value="root" />
<property name="password" value="123456"/>
</bean>
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter> 
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
  1. 拷贝mysql及durid数据源的jar包到activemq的lib目录下。

4)重启activemq。

消息事务

消息事务,是保证消息传递原子性的一个重要的特征,和JDBC的事务特征类似

一个事务性发送,其中一组消息要么能够全部成功保证到达服务器,要么都不到达服务器,

生产者,消费者与消息服务器直接都支持事务性

ActiveMQ的事务主要偏向于生产者的应用

ActiveMQ消息事务流程图:

ActiveMQ消息事务流程图.png

实现:生产者

package com.czy.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.jms.ConnectionFactory;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.config
 * @ClassName: ActiveMQConfig
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/15 14:45
 * @Version: 1.0
 */
@Configuration
public class ActiveMQConfig {

    /**
     * @Author: 曹振远
     * @Description: 添加JMS事务管理器
     * @Date: 14:47 2021/6/15
     */
    @Bean
    public PlatformTransactionManager createTransactionManager(ConnectionFactory connectionFactory){
        return new JmsTransactionManager(connectionFactory);
    }
}

@Service
@Transactional //这个注解不但可以实现消息的事务,也可以解决数据库的事务操作
public class MessageService {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage() {
        for (int i = 0; i <= 10; i++) {
            int a = 10 / 0;
        }
        jmsTemplate.convertAndSend("spring-demo","testMQ");
    }
}

消费者实现

package com.czy.listener;

import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;


import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.listener
 * @ClassName: QueueListener
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:56
 * @Version: 1.0
 */
@Component
public class QueueListener {

    @JmsListener(destination = "springboot-queue")
    public void receiveMessage(Message message, Session session){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println(textMessage.getText());
                session.commit();//提交
            } catch (JMSException e) {
                e.printStackTrace();
                try {
                    session.rollback();//一旦事务回滚,MQ会重发消息,一共重发6次
                } catch (JMSException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
}

消息确认机制

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包括三个阶段:客户接收消息,客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生,在非事务性会话中,消息何时被确认取决于创建会话的应答模式。该参数有以下三个可能:

描述
Session.AUTO_ACKNOWLEDGE 当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法返回的时候,会话自动确认客户收到的消息
Session.CLENT_ACKNOWLEDGE 客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确定是在会话层上进行,确认一个被消费的消息将自动确认所有已被会话消费的消息,例如,如果一个消费者消费了10个消息,然后确认第五个消息,那么所有10个消息都被确认
Session.DUPS_ACKNOWLEDGE 该选择只是会话迟钝消息的提交,如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS proovider必须把消息头的JMSRedelivered设置为true

注意:消息确认机制和事务机制是冲突的,只能选其中一个


ActiveMQ消费方的消费确认机制.png

消费方配置类

package com.czy.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.config
 * @ClassName: AriveMQConfig
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/15 16:53
 * @Version: 1.0
 */
@Configuration
public class AriveMQConfig {

    @Bean("jmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //关闭事务(重要)
        factory.setSessionTransacted(false);
        //修盖消息确认机制,springboot整合ActiveMQ后,手动确认是4
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
}

package com.czy.listener;

import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;


import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.listener
 * @ClassName: QueueListener
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:56
 * @Version: 1.0
 */
@Component
public class QueueListener {

    @JmsListener(destination = "springboot-queue",containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Message message){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println(textMessage.getText());
                message.acknowledge();//手动确认接收消息
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

消息的投递方式

1.异步投递vs同步投递

同步:消息生产者使用持久传递模式发送消息的时候,Producer.send()方法会被阻塞,直到broker发送一个确认消息给生产者ProducerAck,这个确认消息暗示Broker已经成功接收到消息并把消息保存到二级存储中。

异步:如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送,异步发送不会在收到broker的确认之前一直阻塞Producer.send方法想要使用异步,在brokerURL中增加jms.alwaysSycSend=false&jms.useAsyncSend=true属性。

  1. 如果设置了alwaysSycSend=true,系统将会忽略useAsyncSend设置的值都采用同步。
  2. 当alwaysSycSend=false时,“non_persistent”(非持久化)、事务中的消息将使用“异步发送”。
  3. 当alwaysSycSend=false时,如果指定了useAsyncSend=true,“persistent”类型的消息使用异步发送,如果useAsyncSend=false,“persistent”类型的消息使用的同步发送。

总计:默认情况jms.alwaysSycSend=false&jms.useAsyncSend=false,非持久化消息、事务内的消息均采用异步发送:对持久化的消息采用同步发送。

2.配置异步投递
//1.在连接上配置
new ActiveMQConnertionFactory("tcp://localhost:61616?jms.useAsyncSend=true")
    
//2.通过ConnectionFactory
(ActiveMQConnertionFactory)connertionFactory.setUserAsyncSend(true);

//3.通过connection
(ActiveMQConnertionFactory)connection.setUserAsyncSend(true);

注意:Spring和SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递。

 /**
     *  异步发送非持久化JmsTemplate
     * @param connectionFactory
     * @return
     */
    @Autowired
    @Bean
    public JmsTemplate asynJmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate(connectionFactory);
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        return template;
    }

    /**
     * 同步发送非持久化JmsTemplate
     * @param connectionFactory
     * @return
     */
    @Autowired
    @Bean
    public JmsTemplate synJmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate(connectionFactory);
        return template;
    }
3.异步投递如何确认发送成功

异步投递丢失消息的场景:生产者设置UserAsyncSend=true,使用producer.send(message)持续发送消息,由于消息不阻塞,生产者会认为所有的send消息均被成功发送到MQ。如果MQ突然宕机,此时生产者端内存中尚未发送至MQ的消息都会丢失。

这时,可以给异步投递方式接收回调,以确认消息是否发送成功!

producer.send(textMessage,new AsyncCallback() {
        @Override public void onSuccess ()
        { // 使用msgid标识来进行消息发送成功的处理 
            System.out.println(msgid + " 消息发送成功");
        }
        @Override public void onException (JMSException exception)
        { // 使用msgid表示进行消息发送失败的处理
            System.out.println(msgid + " 消息发送失败");
            exception.printStackTrace();
        }
    });
4.延时投递

生产者提供两个发送消息的方法,一个是即时发送,一个是延时发送。

1.修改activemq.xml

<broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" > ...... </broker>

重点:schedulerSupport="true"

2.在代码中设置延时

    /**
     * 延时投递
     */
    @Test
    public void sendMessage() {
        Connection connection = null;
        Session session = null;
        ActiveMQMessageProducer producer = null;
        // 获取连接工厂
        ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
        try {
            connection = connectionFactory.createConnection();
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("springboot-queue");
            int count = 10;
            producer = (ActiveMQMessageProducer) session.createProducer(queue);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //创建需要发送的消息
            TextMessage textMessage = session.createTextMessage("Hello");
            //设置延时时长(延时10秒)
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
            producer.send(textMessage);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
5.定时投递

1.启动类添加定时注解

package com.czy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy
 * @ClassName: AppProducer
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/11 15:37
 * @Version: 1.0
 */
@SpringBootApplication
@EnableScheduling//开启定时功能
public class AppProducer {
    public static void main(String[] args) {
        SpringApplication.run(AppProducer.class, args);
    }
}

@Scheduled(fixedDelay = 3000)//定时任务注解
    public void sendQueue() {
        jmsMessagingTemplate.convertAndSend("springboot-queue", "消息ID:" + UUID.randomUUID().toString().substring(0, 6));
        System.out.println("消息发送成功...");
    }

死信队列

DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息。

以下情况,消息会被重发:

  1. 事务 rollback()。
  2. 事务没有调用 commit()。
  3. 没有开启事务,使用手动确认,session.recover()时。

当一个消息被重发6次(缺省为6次),会给broker发送一个“Poison ack",这个消息被认为是a poison pill,这个时候broker会将这个消息发送到死信队列,以便后续处理。

注意两点:

可以通过配置文件(activemq.xml)来调整死信的发送策略。

1.修改activemq.xml,配置每个队列自己的死信队列

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <!--配置每个队列自己的死信队列-->
            <policyEntry queue=">">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="DLQ."
                     useQueueForQueueMessages="true" />
                </deadLetterStrategy>
            </policyEntry>
            <policyEntry topic=">" >
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

2.RedeliveryPolicy重发策略,consumer

package com.czy.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.jms.ConnectionFactory;

/**
 * @ProjectName: jms-producer
 * @Package: com.czy.config
 * @ClassName: AriveMQConfig
 * @Author: 曹振远
 * @Description:
 * @Date: 2021/6/15 16:53
 * @Version: 1.0
 */
@Configuration
public class ActiveMQConfig {

    //RedeliveryPolicy重发策略设置
    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
        //是否在每次尝试重新发送失败后,增长这个等待时间
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重发次数,默认为6次   这里设置为10次
        redeliveryPolicy.setMaximumRedeliveries(10);
        //重发时间间隔,默认为1秒
        redeliveryPolicy.setInitialRedeliveryDelay(2);
        //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);

        return redeliveryPolicy;
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory(
                        "admin",
                        "admin",
                        url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }

    @Bean(name="jmsQueryListenerFactory")
    public DefaultJmsListenerContainerFactory   jmsListenerContainerFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager){
        DefaultJmsListenerContainerFactory  factory=new DefaultJmsListenerContainerFactory ();
        factory.setTransactionManager(transactionManager);
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(true); // 开启事务
        factory.setSessionAcknowledgeMode(1);
        return factory;
    }
}

ActiveMQ企业面试经典问题

问题1:ActiveMQ宕机了怎么办?

ActiveMQ主从集群:Zookeeper集群+Replocated LevelDB+ActiveMQ集群

ActiveMQ集群.png

问题2:如何防止消费方消息重复消费?(消息幂等)

如果因为网络延迟等原因,MQ无法及时接收到消费方的应答,导致MQ重试,在重试过程中造成重复消费的问题。

解决思路:

问题3:如何防止消息丢失?

问题4:什么是死信队列?

MQ消息处理失败或者过期,消息不会丢失的一种机制。

上一篇下一篇

猜你喜欢

热点阅读