消息队列之ActiveMQ

2020-05-08  本文已影响0人  Demon先生

1. 什么是ActiveMQ

1.1. 介绍

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

1.2. ActiveMQ的消息形式

ActiveMQ对于消息的传递有两种类型:

第一种:点对点的,即一个生产者和一个消费者一一对应。


image.png

第二种:发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

image.png

对于消息正文格式,以及调用的消息类型。JMS定义了五种,允许发送并接收一些不同形式的数据。

· StreamMessage -- Java原始值的数据流

· MapMessage--一套名称-值对

· TextMessage--一个字符串对象

· ObjectMessage--一个序列化的 Java对象

· BytesMessage--一个字节的数据流

2. ActiveMQ服务器的安装

ActiveMQ是java语言开发的,需要使用到jdk,这里介绍在Linux系统种进行安装。具体步骤如下:

第一步:下载ActiveMQ压缩包,上传并解压到Linux系统。

第二步:进入解压包路径/bin目录下,启动ActivceMQ,相关操作命令如下:

启动:
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
2222.png

第三步:检测是否启动成功,在浏览器中访问网址:ip:8161/admin,在不修改配置文件的前提下,8161端口为默认端口,用户名:admin,密码:admin

image.png

3. ActiveMQ的三种使用模式

在进行ActiveMQ操作之前,需要在Maven工程pom.xml文件中引入ActiveMQ的jar包,引用如下:

   <!-- activemq 消息队列配置-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.0</version>
        </dependency>

3.1. Queue点对点模式

Queue 是点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。

3.1.1 构建Producer生产者

Producer为生产者,生产并发送消息。

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

//生产者发送消息
    @Test
    public void send() throws Exception{
        //1.创建一个连接工厂 connectionfactory
        //参数:就是要连接的服务器的地址
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
        //2.通过工厂获取连接对象 创建连接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象  提供发送消息等方法
        //第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。
        //第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//
        //5.创建目的地 (destination)  queue  
        //参数:目的地的名称
        Queue queue = session.createQueue("queue-test");
        //6.创建个生产者
        MessageProducer producer = session.createProducer(queue);
        //7.构建消息的内容  
        TextMessage textMessage = session.createTextMessage("queue测试发送的消息");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }

3.1.2 构建Consumer消费者

Consumer为消费者,负责接收和处理消息。

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源

@Test
    public void recieve() throws Exception {
        //1.创建连接的工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
        //2.创建连接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建session
        //第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。
        //第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建接收消息的一个目的地
        Queue queue = session.createQueue("queue-test");
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        //7.接收消息 打印
        //设置一个监听器
        //System.out.println("start");
        //这里其实开辟了一个新的线程
        consumer.setMessageListener(new MessageListener() {
            
            //当有消息的时候会执行以下的逻辑
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage message2 = (TextMessage) message;
                    try {
                        System.out.println("接收的消息为"+message2.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //System.out.println("end");
        Thread.sleep(199999);
        //8.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

3.2. Topic发布订阅模式

Topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。

3.2.1 构建Producer生产者

Producer为生产者,生产并发送消息。

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

// 发送topic
    @Test
    public void send() throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
        //2.创建连接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建目的地    topic
        Topic createTopic = session.createTopic("topic-test");
        //6.创建生成者
        MessageProducer producer = session.createProducer(createTopic);
        //7.构建消息对象
        TextMessage createTextMessage = session.createTextMessage("topic发送的消息123");
        //8.发送消息
        producer.send(createTextMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }

3.2.2 构建Consumer消费者

Consumer为消费者,负责接收和处理消息。

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源

@Test
    public void reieve() throws Exception {
        // 1.创建连接的工厂 指定MQ服务器的地址
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
        // 2.获取连接
        Connection connection = connectionFactory.createConnection();
        // 3.开启连接
        connection.start();
        // 4.根据连接对象创建session (提供了操作activmq的方法)
        // 第一个参数:表示是否开启分布式事务(JTA) 一般就是false :表示不开启。 只有设置了false ,第二个参数才有意义。
        // 第二个参数:表示设置应答模式 自动应答和手动应答 。使用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.根据session创建目的地(destination)
        Topic topic = session.createTopic("topic-test");
        // 6.创建消费者;
        MessageConsumer consumer = session.createConsumer(topic);
        // 7.接收消息
        // 设置一个监听器 就是开启了一个新的线程
        System.out.println("start");
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage message2 = (TextMessage) message;
                    String text = "";
                    try {
                        text = message2.getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    } // 获取消息的内容
                    System.out.println(text);
                }
                System.out.println();
            }
        });
        System.out.println("end");
        // 睡眠
        Thread.sleep(10000000);

        // 9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

3.3. Spring整合模式

在Maven工程pom.xml文件中添加Spring相关的jar包,引用如下:

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version> 4.2.4.RELEASE</version>
        </dependency>

3.3.1 配置Spring和ActiveMQ配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.129:61616"/>
    </bean>
    <!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnection"/>
    </bean>
    <!-- 接收和发送消息时使用的类 模板对象-->
    <bean class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="item-change-queue"></constructor-arg>
    </bean> -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="item-change-topic"/>
    </bean>
    
    <!-- 监听器 -->
    <bean id="myMessageListener" class="com.demon.activemq.spring.MyMessageListener"/>
    <!-- 监听容器,作用:启动线程做监听 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="myMessageListener"/>
    </bean>

    <bean id="myMessageListener2" class="com.demon.activemq.spring.MyMessageListener"/>
    <!-- 监听容器,作用:启动线程做监听 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="myMessageListener2"/>
    </bean>
</beans>

3.3.2 生产者发送消息

第一步:初始化一个spring容器

第二步:从容器中获得JMSTemplate对象。

第三步:从容器中获得一个Destination对象

第四步:使用JMSTemplate对象发送消息,需要知道Destination

@Test
    public void send() throws Exception{
        //1.初始化spring容器
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/resource/applicationContext-activemq.xml");
        //2.获取到jmstemplate的对象
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        //3.获取destination
        Destination destination = (Destination) context.getBean(Destination.class);
        //4.发送消息
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("通过spring发送的消息123");
            }
        });
        Thread.sleep(100000);
    }

3.3.3 消费者接收消息

创建一个MessageListener的实现类。

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        //获取消息
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;
            String text;
            try {
                text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

文档下载地址:

https://wenku.baidu.com/view/94f62ed6876fb84ae45c3b3567ec102de3bddf14

上一篇下一篇

猜你喜欢

热点阅读