IT必备技能MQ

RabbitMQ消息确认、消息持久化等核心知识总结

2019-06-19  本文已影响332人  wangzaiplus

本文参考石杉的架构笔记公众号, 总结了一下RabbitMQ的核心知识, 非常感谢

一、消息中间件选型

1. ActiveMQ:
2. RabbitMQ:
3. RocketMQ:
4. Kafka:

二、消息中间件的常见使用场景

三、系统架构引入消息中间件后会有哪些缺点

四、消息发送确认

生产者发送消息, 先发送消息到Exchange, 然后Exchange再路由到Queue, 这中间就需要确认两个事情

spring提供了两个回调函数来处理这两种消息发送确认

1. 确认消息是否成功发送到Exchange

有2种方式, 一种是重量级的事务消息机制。采用类事务的机制把消息投递到MQ,可以保证消息不丢失,但是性能极差,经过测试性能会呈现几百倍的下降。

所以说现在一般是不会用这种过于重量级的机制,而是会用轻量级的confirm机制。

另一种方式是confirm机制, 跟手动ack机制类似, 生产者将消息投递到RabbitMQ, 且将消息持久化到硬盘后, RabbitMQ会通过一个回调方法将confirm信息回传给生产端, 这样, 如果生产端的服务接收到了这个confirm消息,就知道是已经持久化到磁盘了。否则如果没有接收到confirm消息,那么就说明这条消息可能半路丢失了,此时你就可以重新投递消息到MQ去,确保消息不会丢失。

1.1 通过AMQP的事务机制可以保证消息发送确认
事务机制主要是通过对channel的设置实现

channel.txSelect();// 声明启动事务模式
channel.txComment();// 提交事务
channel.txRollback();// 回滚事务

1.2 使用confirm确认机制
实现ConfirmCallback并重写confirm回调方法, 消息发送到Broker后触发回调, 可以确认消息是否成功发送到Exchange

application.properties:

# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true

回调:

        // 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功发送到Exchange");
            } else {
                log.info("消息发送到Exchange失败: cause: {}", correlationData, cause);
            }
        });
2. 确认消息是否从Exchange成功路由到Queue

实现ReturnCallback并重写returnedMessage回调方法, 可以确认消息从EXchange路由到Queue失败, 注意: 这里的回调是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法

application.properties:

# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
spring.rabbitmq.template.mandatory=true

回调:

        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

五、消息接收确认

消息怎样才算消费成功?

RabbitMQ默认自动确认(ack)消息被正确消费, 即消息投递到消费者后就自动确认消息被处理完毕, 并且会将该消息删除, 即使消费者意外宕机, 或者抛出异常, 如果消费者接收到消息, 还没处理完成就down掉或者抛出异常, 那么, 这条消息就丢失了

分析一下问题出在哪, 问题出在RabbitMQ只管消息投递出去, 而不管消息是否被正确处理就自动删除消息, 所以, 只要将自动ack修改为手动ack, 消费成功才通知RabbitMQ可以删除该消息即可, 如果消费者宕机, 消费失败, 由于RabbitMQ并未收到ack通知, 且感知到该消费者状态异常(如抛出异常), 就会将该消息重新推送给其他消费者, 让其他消费者继续执行, 这样就保证消费者挂掉但消息不会丢失

消息确认模式有:

默认情况下消息消费者是自动ack(确认)消息的, 如果要手动ack(确认), 则需要修改确认模式为manual

application.properties:

# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费消息并手动确认:

@Component
@Slf4j
public class LogUserConsumer {

    @Autowired
    UserLogService userLogService;

    @RabbitListener(queues = "log.user.queue")
    public void logUserConsumer(Message message, Channel channel, @Header (AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            log.info("收到消息: {}", message.toString());
            userLogService.insert(MessageHelper.msgToObj(message, UserLog.class));
        } catch (Exception e){
            log.error("logUserConsumer error", e);
            channel.basicNack(tag, false, true);
        } finally {
            channel.basicAck(tag, false);
        }
    }

}

六、消息持久化

消息被投递到RabbitMQ的内存中, 还没投递到消费者实例之前宕机了, 消息不就丢失了?

可以进行消息持久化, 将Exchange、queue和message都持久化到硬盘, 这样, RabbitMQ重启时, 会把持久化的Exchange、queue和message从硬盘重新加载出来, 重新投递消息

1.1 Exchange的持久化, 声明交换机时指定持久化参数为true即可

    @Bean
    public DirectExchange logUserExchange() {
        return new DirectExchange("log.user.exchange", true, false);
    }

第二个参数durable: 是否持久化, 第三个参数autoDelete: 当所有绑定队列都不再使用时, 是否自动删除交换器, true: 删除, false: 不删除

1.2 queue的持久化, 声明队列时指定持久化参数为true即可

    @Bean
    public Queue logUserQueue() {
        return new Queue("log.user.queue.name", true);
    }

第二个参数durable, 是否持久化

1.3 message的持久化, 是通过配置deliveryMode实现的, 生产者投递时, 指定deliveryModeMessageDeliveryMode.PERSISTENT即可实现消息的持久化, 投递和消费都需要通过Message对象进行交互, 为了不每次都写配置转换的代码, 我们写一个消息帮助类MessageHelper:

public class MessageHelper {

    public static Message objToMsg(Object obj) {
        if (null == obj) {
            return null;
        }

        Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
        message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);

        return message;
    }

    public static <T> T msgToObj(Message message, Class<T> clazz) {
        if (null == message || null == clazz) {
            return null;
        }

        String str = new String(message.getBody());
        T obj = JsonUtil.strToObj(str, clazz);

        return obj;
    }

}

消息投递时:

rabbitTemplate.convertAndSend("log.user.exchange.name", "log.user.routing.key.name", MessageHelper.objToMsg(userLog));

消息消费时(参考五、消息接收确认):

UserLog userLog = MessageHelper.msgToObj(message, UserLog.class);

如果不需要消息持久化, 则不需要通过Message进行转换, 可以直接通过字符串或者对象投递和消费

七、unack消息的积压问题

什么叫unack消息的积压问题, 简单来说就是消费者处理能力有限, 无法一下将MQ投递过来的所有消息消费完, 如果MQ推送消息过多, 比如可能有几千上万条消息积压在某个消费者实例内存中, 此时这些积压的消息就处于unack状态, 如果一直积压, 就有可能导致消费者服务实例内存溢出、内存消耗过大、甚至内存泄露

所以, RabbitMQ是必须要考虑一下消费者服务的处理能力的。

如何解决?

RabbitMQ基于一个prefetch count来控制这个unack message的数量。

你可以通过 “channel.basicQos(10)” 这个方法来设置当前channel的prefetch count。也可以通过配置文件设置: spring.rabbitmq.listener.simple.prefetch=10

举个例子,比如你要是设置为10的话,那么意味着当前这个channel里,unack message的数量不能超过10个,以此来避免消费者服务实例积压unack message过多。

这样的话,就意味着RabbitMQ正在投递到channel过程中的unack message,以及消费者服务在处理中的unack message,以及异步ack之后还没完成ack的unack message,所有这些message加起来,一个channel也不能超过10个。

如果你要简单粗浅的理解的话,也大致可以理解为这个prefetch count就代表了一个消费者服务同时最多可以获取多少个message来处理。

prefetch就是预抓取的意思,就意味着你的消费者服务实例预抓取多少条message过来处理,但是最多只能同时处理这么多消息。

如果一个channel里的unack message超过了prefetch count指定的数量,此时RabbitMQ就会停止给这个channel投递消息了,必须要等待已经投递过去的消息被ack了,此时才能继续投递下一个消息。

设置多大合理?

RabbitMQ官方给出的建议是prefetch count一般设置在100 - 300之间。也就是一个消费者服务最多接收到100 - 300个message来处理,允许处于unack状态。

这个状态下可以兼顾吞吐量也很高,同时也不容易造成内存溢出的问题。

八、总结

# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
spring.rabbitmq.template.mandatory=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
上一篇 下一篇

猜你喜欢

热点阅读