程序猿之路分布式各组件

rabbitmq 两种模式下(事务/发布确认)性能对比

2018-09-05  本文已影响568人  0爱上1

如何保证消息成功发送?

在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?

何为事务方式发送

事务方式:amqp协议提供的一种保证消息成功投递的方式
通过将信道开启 transactional 模式
并利用信道 Channel 的三个方式来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递

事务方式流程分析

  1. 代码实现

     @Test
     public void testTransactionalMode() {
     // 开启事务模式,模拟发送一万条消息,记录总耗时
     // 获取连接工厂
     ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
    
     // 开启连接 - tcp连接
     Connection connection = connectionFactory.createConnection();
    
     // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式
     // 此处传入true,就不需要再显示编码 channel.txSelect()了
     // 内部已经调用了channle.txSelect();
     Channel channel = connection.createChannel(true);
    
     // 准备发送一万条测试消息, 每条消息都会开启一个新的事务。
     long start = System.currentTimeMillis();
     for (int i = 0; i <= 10000; i++) {
         try {
             // channel.txSelect();
             channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes());
    
             channel.txCommit();
         } catch (Exception e) {
             // 发生异常,说明消息没有到达broker的queue中,回滚。
             try {
                 log.error("提交事务失败,事务回滚 i = " + i);
                 channel.txRollback();
             } catch (IOException e1) {
                 log.error("mq broker error...");
             }
             log.error("mq broker error...");
         }
     }
     System.out.println("事务方式单消息单事务提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

    }

  2. 抓包分析


    消息投递-事务方式
  3. 流程分析

以上步骤中有关事务的地方发生错误,都会抛出IOException,只要在代码中捕获异常进行事务回滚即可,或者自行决定是否需要重发消息。

何为发布确认方式发送

既然有了事务方式保证消息的投递,那么为何还需要发布确认方式呢?答案是:性能

发布确认模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出

后续会放出事务模式和发送确认模式的实测性能对比

rabbitmq的发送确认方式,也是通过信道开启的

// 开启信道为确认模式
channel.confirmSelect();

而确认模式又分为同步等待mq服务器确认和异步等待确认两种。

配置文件

application.properties

当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃

性能对比

1000条消息发送耗时:18733ms

事务模式下

1000条信息耗时:387ms

异步发送确认方式

消息流转图

消息流转图

如何保证消息被成功消费

为了保证消息被可靠消费,即消息从队列中可靠的发送给消费者,需要有一定的机制保证。

RabbitMQ 提供了消息消费确认机制,即当消息到达消费者,消费者执行消费代码后,需要告知mq,该消息已经被成功消费。这就是消费确认机制

消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它

手动确认消息消费配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual

总结

事务方式发布消息,性能太差,往往不采用事务的方式发布消息,建议采用异步发送确认的方式

遗留问题

  • 如果在mq服务器异步通知过程中,由于网络原因或者mq正好准备回调就挂了,导致发布者没有收到确认发送的消息怎么办?
  • mq 迟迟未收到consumer的ack怎么处理?
上一篇下一篇

猜你喜欢

热点阅读