ActiveMQ

2020-05-08  本文已影响0人  拼搏男孩

一、概述

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削峰等问题。

实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

二、消息队列的应用场景

1、异步处理:用户注册后,需要发送注册邮件和注册短信,传统的做法有两种:串行和并行,引入消息队列后,异步处理,直接返回,提高效率

​ 2、应用解耦:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口,引入消息队列后订单系统将消息写入消息队列后直接返回,库存系统从消息队列中获取订单信息,这样就实现了订单系统与库存系统的解耦。

​ 3、流量削峰:秒杀与团抢活动中,可以在应用前端加入消息队列,可以控制活动的人数,缓解短时间内高流量压垮应用。用户的请求,服务器接收后,首先写入消息队列中。加入消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

​ 4、日志处理:日志采集客户端,负责日志数据采集,定时写入Kafka队列;Kafka消息队列,负责日志数据的接收,存储和转发;日志处理应用:订阅并消费kafka队列中的日志数据。

​ 5、消息通讯:消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

三、ActiveMQ

1、简介

ActiveMQ出身名门,是Apache门下的最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE1.4规范的JMS Provide实现。它从设计上保证了高性能的集群,当然实现了JMS的P2P与PubSub两种开发模式。

2、安装并运行

去官网http://activemq.apache.org/components/classic/download/下载ActiveMQ的最新版本。解压,进入到bin/win64文件夹下,并启动activemq.bat,在浏览器输入:http://localhost:8161/即可看到ActiveMQ的管理界面。点击:Manage ActiveMQ broker登录管理后台:(用户名、密码默认都为admin)。登录进去以后我们可以点击Queues看到消息队列以及消费者情况等。

4、代码示例

4.1 添加依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qianfeng</groupId>
    <artifactId>507activemq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.12</version>
        </dependency>
    </dependencies>

</project>

由于不是spring项目,只是一个简单的spring项目,所以只添加了activemq-all这个依赖

4.2 P2P模式

消息队列有两种模式,一种称为p2p,点对点的模式,另一种称为pubsub(publish、subscribe)模式,一对多的模式,这两种模式各有特点。

生产者:

TestProvider.java

package com.qianfeng;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class TestProvider {
    @Test
    public void testProvider(){
        //使用缺省的用户名、密码和url创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL
        );
        try {
            //通过连接工厂创建连接对象
            Connection connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //通过连接对象创建会话对象,第一个参数位是否开启事务
            Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //P2P模式使用的是Queue
            Queue queue = session.createQueue("names");
            MessageProducer producer = session.createProducer(queue);
            sendMessage(session,producer);
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    private void sendMessage(Session session, MessageProducer producer) {
        try {
            for (int i = 0; i < 10; i++) {
                TextMessage message = session.createTextMessage("zhangsan"+i);
                producer.send(message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

TestConsumer.java

package com.qianfeng;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class TestConsumer {
    @Test
    public void testConsumer(){
        //使用缺省的用户名、密码和url创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL
        );
        try {
            //通过连接工厂创建连接对象
            Connection connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //通过连接对象创建会话对象,第一个参数为是否开启事务
            Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("names");
            //获取consumer
            MessageConsumer consumer = session.createConsumer(queue);
            while (true){
                TextMessage message = (TextMessage) consumer.receive(10000);
                if(message != null){
                    System.out.println(message.getText());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

P2P模式的特点是消费者一定会收到,类似现实中的短信。

4.3 PubSub模式

PubSub模式类似现实中的广播,需要先启动消费者,再启动生产者,消费者才能收到消息。

TestTopicProvider.java

package com.qianfeng;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class TestTopicProvider {
    @Test
    public void testTopicProducer(){
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        try {
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("test-topic");
            MessageProducer producer = session.createProducer(topic);
            sendMessage(session,producer);
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    private void sendMessage(Session session, MessageProducer producer) {
        try {
            for (int i = 0; i < 10; i++) {
                TextMessage message = session.createTextMessage("zhangsan"+i);
                System.out.println("activeMQ发送消息"+i);
                Thread.sleep(5000);
                producer.send(message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

TestTopicConsumer.java

package com.qianfeng;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class TestTopicConsumer {
    @Test
    public void testTopicConsumer(){
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        try {
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("test-topic");
            MessageConsumer consumer = session.createConsumer(topic);
            while (true){
                TextMessage message = (TextMessage) consumer.receive(10000);
                if(message!=null){
                    System.out.println("接收到消息:"+message.getText());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读