消息中间件入门「一」:初识消息中间件【ActiveMQ】

2019-01-30  本文已影响0人  rpf_siwash

背景介绍

消息中间件相当于进程间通信的信托,可以降低复杂系统中各个模块间的耦合度。对于信托:你只需要把Message给我,就没你的事儿了。我负责给你送到目的地,就不需要你必须实时的守着,等待所有通信细节的完成。就算你突然挂了也没事,Message由信托给你存着,直到送到目的地才会消失。也就是说通信细节都由消息中间件完成,生产者只需要把消息给中间件即可,而消费者只需要绑定好地址。有了消息就会主动推送过来,也不需要消费者主逻辑线程守着。而webservice则是需要客户端与服务端都保持在线,客户端发起请求后必须等待服务端完成处理并将数据返回。由此可见中间件可以延迟处理也能够实时的处理,十分可靠。而webservice要求有较高的实时性,通信过程不允许出现宕机情况。也无法延时处理。

常用的消息中间件有:ActiveMQ,RabbitMQ,RocketMQ,kafka等。而ActiveMQ是一款完全遵循jms规范的消息中间件,支持多种协议如Stomp等,如图1所示。

图1 ActiveMQ的运用

第一步:安装ActiveMQ

  1. 访问ActiveMQ官方下载安装对应平台的消息安装包,点我进入下载地址
    图2 下载官网
  2. 解压到自己电脑上,进入bin目录下,在cmd中输入activemq start启动如图3。
    图3 启动activemq
  3. 运行bin/win64/activemq.bat 也可以快速启动

根据dos窗口内容了解activemq的启动信息

在启动好actviemq后,在dos窗口中可以看到输出了很多内容,那么具体代表什么含义呢:


图4 启动打印内容.png
       -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

第二步:使用actviemq

 <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>
    </dependencies>

Tips:version最好与下载的actviemq版本一致

1.使用队列模式

package queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueProducer {
    private static final String url="tcp://localhost:61616";
    private static final String queueName="HelloActiveMQ";

    public static void main(String[] args) throws JMSException, InterruptedException {
        //创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
        //创建连接
        Connection connection=connectionFactory.createConnection();
        //连接启动
        connection.start();
        //通过连接获得session
        Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目标
        Destination destination=session.createQueue(queueName);
        //创建一个生产者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //创建消息
            TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
            //发布消息
            producer.send(message);
            System.out.println("发送消息:"+message.getText());
            Thread.sleep(500);
        }
        //关闭连接
        connection.close();

    }
}
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqConsumer {
    private static final String queueName="HelloActiveMQ";
    private static final String url="tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        //获取连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
        //获取连接
        Connection connectionn=connectionFactory.createConnection();
        //启动连接
        connectionn.start();
        //创建session
        Session session=connectionn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //绑定地址
        Destination destination=session.createQueue(queueName);
        //创建一个消费者对象
        MessageConsumer consumer=session.createConsumer(destination);
        //创建监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage= (TextMessage) message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

接收消息有两种方式,一种是同步方式,一种是异步方式。本例使用异步方式。使用同步方式代码如下:

Message message = consumer.receive();
String text = ((TextMessage) message).getText(); 
System.out.println("接收到消息:" + text); 

同步方式执行结束则不会再接收消息,如果要持续接收消息,需要写在while死循环中;异步方式似乎是为了解决这种问题,直接通过创建监听器的方式启动一个线程,会持续接收新消息。
Tips:可以看出无论消费者还是生产者,前面都要经过:

其前面的过程如同jdbc连接数据库一样,而后面根据消费者和生产者的不同角色定位,去发送或者接收消息。


接收到消息:hello Mq:0
…………
接收到消息:hello Mq:98
接收到消息:hello Mq:99
100条消息全部接收
  1. 启动一个生产者两个消费者
consumer 1:
……
接收到消息:hello Mq:9
接收到消息:hello Mq:11
接收到消息:hello Mq:13
接收到消息:hello Mq:15
接收到消息:hello Mq:17
接收到消息:hello Mq:19
接收到消息:hello Mq:21
接收到消息:hello Mq:23
接收到消息:hello Mq:25
consumer 2:
……
接收到消息:hello Mq:10
接收到消息:hello Mq:12
接收到消息:hello Mq:14
接收到消息:hello Mq:16
接收到消息:hello Mq:18
接收到消息:hello Mq:20
接收到消息:hello Mq:22
接收到消息:hello Mq:24
接收到消息:hello Mq:26

可以看出队列模式下确实为一对一,点对点,消费者收到的每一条消息都彼此不相同,一个为奇数条,一个为偶数条。并且队列模式下,即时中途关闭actviemq,消费者没取完的消息也会被保存起来,在下次运行时消费者可以接着接收消息。

2. 使用主题模式

package topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicProducer {
    private static final String url="tcp://localhost:61616";
    private static final String queueName="queue";

    public static void main(String[] args) throws JMSException, InterruptedException {
        //创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
        //创建连接
        Connection connection=connectionFactory.createConnection();
        //连接启动
        connection.start();
        //通过连接获得session
        Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目标
        Destination destination=session.createTopic(queueName);
        //创建一个生产者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //创建消息
            TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
            //发布消息
            producer.send(message);
            System.out.println("发送消息:"+message.getText());
            Thread.sleep(500);
        }
        //关闭连接
        connection.close();

    }
}

可以看出于队列模式唯一区别就是创建目标变成了Topic:

//创建目标
Destination destination=session.createTopic(queueName);
//创建目标
Destination destination=session.createTopic(queueName);

消费者所有代码:

package topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicConsumer {
    private static final String queueName = "queue";
    private static final String url = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        //获取连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //获取连接
        Connection connectionn = connectionFactory.createConnection();
        //启动连接
        connectionn.start();
        //创建session
        Session session = connectionn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //绑定地址
        Destination destination = session.createTopic(queueName);
        //创建一个消费者对象
        MessageConsumer consumer = session.createConsumer(destination);
        //创建监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    Thread.sleep(500);
                    System.out.println("接收到消息:" + textMessage.getText());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

由于主题模式中,生产者所有消息,在订阅了该主题的所有消费者中都能收到。因此直接启动一个生产者和两个消费者,这里启动时候必须先启动消费者,不然会丢失部分消息。

consumer1:
接收到消息:hello Mq:0
接收到消息:hello Mq:1
接收到消息:hello Mq:2
接收到消息:hello Mq:3
接收到消息:hello Mq:4
接收到消息:hello Mq:5
接收到消息:hello Mq:6
接收到消息:hello Mq:7
接收到消息:hello Mq:8
接收到消息:hello Mq:9
接收到消息:hello Mq:10
consumer2:
接收到消息:hello Mq:0
接收到消息:hello Mq:1
接收到消息:hello Mq:2
接收到消息:hello Mq:3
接收到消息:hello Mq:4
接收到消息:hello Mq:5
接收到消息:hello Mq:6
接收到消息:hello Mq:7
接收到消息:hello Mq:8
接收到消息:hello Mq:9
接收到消息:hello Mq:10

可以看出所有消费者在同一时间段内接收到的消息都是一样的,并且在订阅前的消息会丢失。进入Topic管理页面中可以查看丢失的消息,以及推送的消息。如图10:

图10 topic管理页面
图中丢失的消息数量=生产消息数量*消费者数量-消费出去的消息数量。
因此由于主题模式的这种特性,消费者一定要在生产者生产消息前订阅好destination

activemq支持的消息(Message)数据类型

Activemq支持的Message如下:

  1. TextMessage 文本消息:携带一个java.lang.String作为有效数据(负载)的消息,可用于字符串类型的信息交换
  2. ObjectMessage 对象消息:携带一个可以序列化的Java对象作为有效负载的消息,可用于Java对象类型的信息交换;
  3. MapMessage 映射消息:携带一组键值对的数据作为有效负载的消息,有效数据值必须是Java原始数据类型(或者它们的包装类)及String。即:byte , short , int , long , float , double , char , boolean , String
  4. BytesMessage 字节消息 :携带一组原始数据类型的字节流作为有效负载的消息;
  5. StreamMessage 流消息:携带一个原始数据类型流作为有效负载的消息,它保持了写入流时的数据类型,写入什么类型,则读取也需要是相同的类型;

可以看出最常用的应该还是ObjectMessage类型,TextMessage只是简单的字符串传输,不适用于复杂的数据交互。而MapMessage只支持基本数据类型和String。BytesMessage则偏向于底层。StreamMessage任然还是支持的数据类型有限制。

如何使用java对象传输消息

使用java对象传输消息官网给出了两种方式:

具体步骤(信任所有package)

  1. 在生产者和消费者连接公共部分connectionFactory调用setTrustAllPackages设置为true
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(url);
((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);
  1. 生产者生产消息使用ObjectMessage传出消息,代码如下:
 //创建一个生产者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //创建消息
            User user=new User();
            user.setAge(i);
            user.setName((i%2==0? "Mr":"Miss")+String.valueOf((char)(int)(Math.random()*26+'A'))+String.valueOf((char)(int)(Math.random()*26+'a')));
            user.setSex(i%2==0? "男":"女");
            user.setSalary((int)(Math.random()*10000));
            user.setNumber(UUID.randomUUID().toString());
            ArrayList<User> users=new ArrayList<User>();
            HashMap<String,User> map=new HashMap<String, User>();
            map.put(String.format("第%d个", i),user);
            users.add(user);
            ObjectMessage message=session.createObjectMessage(map);
            //TextMessage message=session.createTextMessage(String.format("hello Mq:%d", i));
            //发布消息
            producer.send(message);
            System.out.println("发送消息:"+message.getObject());
            Thread.sleep(1);
        }

运行效果如下:

发送消息:{第0个=User{name='MrEt', age=0, sex='男', number='b55a5bb5-31be-4df3-9050-adaa69d46482', salary=2266}}
发送消息:{第1个=User{name='MissZe', age=1, sex='女', number='df20aa34-1956-4401-809a-fb44ab524ad0', salary=781}}
发送消息:{第2个=User{name='MrFs', age=2, sex='男', number='7ec61656-f40f-4759-b967-a92df7a8889f', salary=7582}}
发送消息:{第3个=User{name='MissIm', age=3, sex='女', number='7ad7107d-7648-40b4-84f6-4e5f815771b6', salary=4046}}
发送消息:{第4个=User{name='MrZc', age=4, sex='男', number='fe01b62d-b0fb-4054-a641-40fe8f0c8cbc', salary=5659}}
发送消息:{第5个=User{name='MissJa', age=5, sex='女', number='a525b377-d05b-499c-a4eb-b6bcdecaf39a', salary=7397}}
发送消息:{第6个=User{name='MrWg', age=6, sex='男', number='2ea6fe19-dec3-4c34-8bc8-1ae6633ea4fa', salary=788}}
发送消息:{第7个=User{name='MissIw', age=7, sex='女', number='60c57b23-e092-4b94-912d-fc03979fc4bd', salary=7408}}
发送消息:{第8个=User{name='MrOt', age=8, sex='男', number='76454c8c-d00d-464e-8e74-e57b4d63d01e', salary=858}}
发送消息:{第9个=User{name='MissJi', age=9, sex='女', number='ac5bd1ef-0421-
  1. 消费者接收消息,对应的也是将TextMessage修改为ObjectMessage,代码如下:
//创建一个消费者对象
        MessageConsumer consumer=session.createConsumer(destination);
        //创建监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                //TextMessage textMessage= (TextMessage) message;
                ObjectMessage Objmessage= (ObjectMessage) message;
                try {
                    System.out.println("接收到消息:"+Objmessage.getObject());
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

运行效果如下:

接收到消息:{第0个=User{name='MrEt', age=0, sex='男', number='b55a5bb5-31be-4df3-9050-adaa69d46482', salary=2266}}
接收到消息:{第1个=User{name='MissZe', age=1, sex='女', number='df20aa34-1956-4401-809a-fb44ab524ad0', salary=781}}
接收到消息:{第2个=User{name='MrFs', age=2, sex='男', number='7ec61656-f40f-4759-b967-a92df7a8889f', salary=7582}}
接收到消息:{第3个=User{name='MissIm', age=3, sex='女', number='7ad7107d-7648-40b4-84f6-4e5f815771b6', salary=4046}}
接收到消息:{第4个=User{name='MrZc', age=4, sex='男', number='fe01b62d-b0fb-4054-a641-40fe8f0c8cbc', salary=5659}}
接收到消息:{第5个=User{name='MissJa', age=5, sex='女', number='a525b377-d05b-499c-a4eb-b6bcdecaf39a', salary=7397}}
接收到消息:{第6个=User{name='MrWg', age=6, sex='男', number='2ea6fe19-dec3-4c34-8bc8-1ae6633ea4fa', salary=788}}
接收到消息:{第7个=User{name='MissIw', age=7, sex='女', number='60c57b23-e092-4b94-912d-fc03979fc4bd', salary=7408}}
接收到消息:{第8个=User{name='MrOt', age=8, sex='男', number='76454c8c-d00d-464e-8e74-e57b4d63d01e', salary=858}}
接收到消息:{第9个=User{name='MissJi', age=9, sex='女', number='ac5bd1ef-0421-485e-b118-eb840d2172ce', salary=5612}}
}

总结

  • 结合activemq的管理页面学习/开发可以加深理解
  • activemq支持的每种协议端口都是不同的,如果使用stomp,则注意根据启动时dos窗口的info信息去确定对应的地址,修改端口则是在activemq根目录下config/actviemq.xml
  • 注意队列模式与主题模式的区别,尤其是主题模式下,消费者一定要在生产者生产前建立连接
  • 使用ObjectMessage要在生产者和消费者的连接中设置TrustPackage

    知识来源于互联网,因此所学知识也将分享到互联网,希望能给像我一样迷茫萌新提供帮助

上一篇 下一篇

猜你喜欢

热点阅读