消息中间件-activemq消息机制和持久化介绍

2019-12-28  本文已影响0人  Lemonrel

我们知道activemq的使用方式非常简单有如下几个步骤:
1.创建连接工厂
2.创建连接
3.创建会话
4.创建目的地
5.创建生产者或消费者
6.生产或消费消息
7.关闭生产或消费者、关闭会话、关闭连接
这一节我们针对他的消息传播机制和持久化方式做一个简单的学习。在会用的同时我们也需要理解一些基本的概念,这样才不至于在出错后无从下手。
1.activemq服务器工作模型
我们先看一下消息发送的时序图:



ConnectionFactory 对象创建一个连接工厂,消息的发送和接受服务均由此进行;
ConnectionFactory 创建一个活动Connection作为当前使用的连接;
Session 是一个用于生成和使用消息的单线程上下文,它用于创建发送的生产者和接收消息的消费者,并为所发送的消息定义发送顺序。会话通过大量确认选项或通过事务来支持可靠传送。
户端使用 MessageProducer 向指定的物理目标发送消息,生产者可指定一个默认传送模式(持久性消息与非持久性消息)、优先级和有效期值,以控制生产者向物理目标发送的所有消息;
消费者可以支持同步或异步消息接收。异步使用可通过向消费者注册 MessageListener 来实现。当会话线程调用 MessageListener 对象的 onMessage 方法时,客户端将使用消息。

2.ActiveMQ消息传送模型
ActiveMQ 支持两种消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布 /订阅模型),前面我们已经讲过,在此就不赘述。

3.消息选择器
ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。
消息选择器是根据 header 和 properties 允许客户端选择性的制定需要接收的消息,消息选择器是无法利用 消息主体(Body)进行过滤的。无论你的消息主题是什么类型, 文本、或者对象、或者键值对。下面我们讲一下消息选择器的语法以及使用规范:
可接收的类型包括:byte,int,double,boolean,String;
属性标识符定义:
1)变量名与java定义一样;
2)要么在heads中定义 要么在 properties中定义,如果在sender中是在heads中定义而receiver中却从properties中寻找的话,找不到的情况下他是不会自动去heads中寻找的,而是会返回null;
3)根据不同类型的变量选择不同的方法:
message.setIntProperty("test",14);
4)那么在接收端可以对该变量进行拦截:
session.createConsumer(destination,"test > 14");
属性标志符是区分大小写的;
拦截器中的部分表示方式:
1)可以是条件表达式
2)可以是算术表达式
3)可以是比较运算和逻辑运算组成的表达式

支持 () 左右括号;
支持逻辑运算的优先顺序表达式 例如: NOT , AND , OR;
比较运算符有: = , > , >= , < , <= , <> (not equal);
eg:

标识符是null
"prop_name IS NULL"
标识符非空 not null
"prop_name IS NOT NULL"
"age BETWEEN 15 AND 19" is equivalent to "age >= 15 AND age <= 19"
"Country NOT IN (' UK', 'US', 'France') "

代码很简单,只需要在Sender端做如下改写:

TextMessage message = session.createTextMessage();
message.setIntProperty("test",14);
message.setText("test");

Receiver端:

consumer = session.createConsumer(destination,"test > 14");

对发送端的特定字符做一个判断符合条件即被拦截

4.消息确认机制
jms消息只有在被确认之后才认为成功消费了这条消息。消息的成功消费通常包括三个步骤:
(1)client接收消息
(2)client处理消息
(3)消息被确认(也就是client给一个确认消息)
在事务性会话中当一个事务被提交的时候,确认自动发生,和应答模式没关系,这个值可以随便写。(这里多提一句异步消息接收中不能使用事务性会话)。
在非事务性会话中消息何时被确认取决于创建的session中设置的消息应答模式(acknowledge model)该参数有三个值:
1)Session.AUTO_ACKNOWLEDGE:当client端成功的从receive方法或从onMessage(Message message) 方法返回的时候,会话自动确认client收到消息。
2)Session.CLIENT_ACKNOWLEDGE: 客户单通过调用acknowledge方法来确认客户端收到消息。但需要注意在这种应答模式下,确认是在会话层上进行的,确认一个被消费的消息将自动确认所有已消费的其他消息。比如一个消费者已经消费了10条消息,然后确认了第5条消息被消费,则这10条都被确认消费了。、
acknowledge()通知方法是在Message对象上,同步接收,调用acknowledge()方法进行确认如下所示:

consumer = session.createConsumer(queue);  
Message message = consumer.receive();  
message.acknowledge(); 

异步接受,调用acknowledge()方法进行确认:

consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            String value = textMessage.getText();
            System.out.println("value: " + value);
            message.acknowledge(); //消息消费确认通知
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

3)Session.DUPS_ACKNOWLEDGE:不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

  1. 持久化消息
    JMS 支持以下两种消息提交模式:
    5.1 ERSISTENT 持久消息
    是activemq默认的传送方式,此方式下的消息在配合activemq.xml中配置的消息存储方式,会被存储在特定的地方,直到有消费者将消息消费或者消息过期进入DLQ队列,消息生命周期才会结束。此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。如果消息消费者在进行消费过程发生失败,则消息会被再次投递。
    DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 消息持久化在硬盘中,ActiveMQ持久化有三种方式:AMQ、KahaDB、JDBC。
    AMQ
    AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
    KahaDB
    KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。
    JDBC
    可以将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2。
    具体使用方式大家下去查一下,限于篇幅在此就不做太详细的介绍。
    5.2 NON_PERSISTENT 非持久消息
    非持久的消息适用于不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。
    DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在内存中,读写速度快,在JMS服务停止后消息会消失,没有持久化到硬盘。

  2. ActiveMQ消息过期设置
    允许消息过期 。默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。
    有两种方法设置消息的过期时间,时间单位为毫秒:
    1)使用 setTimeToLive 方法为所有的消息设置过期时间;
    2)使用 send 方法为每一条消息设置过期时间。
    消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
    文章来源于网络。
    感谢大家阅读,欢迎大家私信讨论。给大家推荐一个Java技术交流群:473984645里面会分享一些资深架构师录制的视频资料:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多!
    推荐大家阅读:
    Java高级架构学习资料分享+架构师成长之路​
    个人整理了更多资料以PDF文件的形式分享给大家,需要查阅的程序员朋友可以来免费领取。还有我的学习笔记PDF文件也免费分享给有需要朋友!

上一篇下一篇

猜你喜欢

热点阅读