消息中间件ActiveMQ

2019-07-05  本文已影响0人  _52Hertz

消息中间件概述

中间件介绍

什么是中间件?

非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给用户带来价值的软件统称中间件。

什么是消息中间件

关注于数据的发送与接收,利用高效可靠的异步消息传递机制集成分布式系统。

消息中间件图示


应用A通过应用程序接口向消息中间件发送消息,应用B通过应用程序接口向消息中间件接收消息。

什么是JMS

Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送/接收消息,进行异步通信。

什么是AMQ

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制。

JMS和AMQP对比

常见消息中间件对比

ActiveMQ

RabbitMQ

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了消息系统的功能。

特性
综合对比

JMS规范

Java消息服务定义

Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送/接收消息,进行异步通信。

JMS概念

消息模式

队列模型

队列模型示意图

主题模型

队列模型示意图

JMS编码规范

JMS编码接口之间的关系

windows下安装ActiveMQ

Linux下安装ActiveMQ

队列模式的消息演示

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>

ActiveMqProducer.java

package com.cxy.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 生产者
 */
public class ActiveMqProducer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createQueue(queueName);

        //6.创建一个生产者
        MessageProducer producer = session.createProducer(destination);

        for(int i=0;i<100;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            //8.发布消息
            producer.send(textMessage);

            System.out.println("发送消息:"+textMessage.getText());
        }

        //9.关闭连接
        connection.close();
    }
}

ActiveMqConsumer.java

package com.cxy.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 消费者
 */
public class ActiveMqConsumer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createQueue(queueName);

        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });

    }
}

主题模式的消息演示

ActiveMqProducer.java

package com.cxy.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 生产者
 */
public class ActiveMqProducer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createTopic(topicName);

        //6.创建一个生产者
        MessageProducer producer = session.createProducer(destination);

        for(int i=0;i<100;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            //8.发布消息
            producer.send(textMessage);

            System.out.println("发送消息:"+textMessage.getText());
        }

        //9.关闭连接
        connection.close();
    }
}

ActiveMqConsumer.java

package com.cxy.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 消费者
 */
public class ActiveMqConsumer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createTopic(topicName);

        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });

    }
}

activemq模式区分

队列模式:生产者发送消息,所有消费者对消息进行平分,已消费的消息不能重新消费
主题模式:生产者发送消息,所有已订阅主题的消费者都能收到消息。

上一篇 下一篇

猜你喜欢

热点阅读