JMS如何保证消息的可靠性?

2021-09-21  本文已影响0人  蝌蚪1573

JMS如何保证消息的可靠性?

JMS是通过持久化(Persistent)、事务(Transaction)、 签收来(acknowledge)保证的

image.png

1、持久化

首先消息的持久化是在消息的生产者中设置的,由消息的生产者告诉消息中间件是否进行消息的持久化,如果生产者端没有设置,默认是持久化的
通过session创建出来的生产者生产的Queue消息为持久性

设置通过session创建出来的生产者生产的Queue消息为持久性
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化

生产者代码实现:

public class JmsProducer {

   public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
   public static final String QUEUE_ANME="queue01";

   public static void main(String[] args) throws JMSException {
       //1.创建连接工厂
       ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
       //2.通过连接工厂,获得连接connection并启动访问
       Connection connection = activeMQConnectionFactory.createConnection();
       connection.start();
       //3.创建会话session
       // 两个参数:事务/签收
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       //4.创建目的地(队列或主题topic)
//        Destination destination = session.createQueue(QUEUE_ANME);
       Queue queue = session.createQueue(QUEUE_ANME);
       //5.创建消息的生产者
       MessageProducer messageProducer = session.createProducer(queue);
//        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化
//        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
       //6 通过使用messageProducer生产3条消息发送到MQ的队列里面
       for (int i = 1; i <=3; i++) {
           //7创建消息
//            TextMessage textMessage = session.createTextMessage("msg--" + i);
           TextMessage textMessage = session.createTextMessage("textMessage msg--" + i);
           //8.通过messageProducer发送给mq
           messageProducer.send(textMessage);
       }
       //9.关闭资源
       messageProducer.close();
       session.close();
       connection.close();
       System.out.println("MQ消息发布成功");

   }
}

消费者代码实现:

public class JmsConsumer {
    public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
    public static final String QUEUE_ANME="queue01";

    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("*****我是2号消费者");

        //1.创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
        //2.通过连接工厂,获得连接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        // 两个参数:事务/签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(队列或主题topic)
//        Destination destination = session.createQueue(QUEUE_ANME);
        Queue queue = session.createQueue(QUEUE_ANME);
        //5.创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        while (true){
            TextMessage message = (TextMessage) messageConsumer.receive(20000L);
            if(null != message){
                System.out.println("TX消费者接收到消息:"+message.getText());
//                message.acknowledge();
            }else {
                break;
            }
        }

//        System.in.read();//保证控制台不关闭
        messageConsumer.close();
  //      session.commit();
        session.close();
        connection.close();

    }
}

持久化消息是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

2、事务

消息生产者偏重于事务,开启事务后生产者生产的消息,只有在session执行commit后才会被提交到服务器,不然此次提交记录无效。事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。

如果开启事务,事务优先级>签收;

//这边的第一个参数true表示事务开启
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//事务如果是true:  需要commit
session.commit();

3、签收

消费者偏重于签收
签收主要针对于消费者,而且一般在非事务的情况下生效,因为开启事务后,签收与否取决于事务的提交或回滚,与签收设置的方式无关。

签收方式有4种,但平时常用的有两种:

  1. 自动签收
Session.AUTO_ACKNOWLEDGE;//自动签收
  1. 手动签收
Session.CLIENT_ACKNOWLEDGE; //手动签收
message.acknowledge();  // 手动签收需要ack
上一篇 下一篇

猜你喜欢

热点阅读