个人学习vanishalone微服务搭建

RabbitMQ 消息确认机制

2021-05-30  本文已影响0人  SheHuan

之前的文章我们已经介绍了 RabbitMQ 的基本使用,但是在默认情况下 RabbitMQ 并不能保证消息是否发送成功、以及是否被成功消费掉。消息在传递过程中存在丢失的可能。基于这样的现状,就有了消息的确认机制,来提高消息传递过程中的可靠性。

RabbitMQ 中,消息的确认机制包含以下两个方面:

一、准备环境

创建 SpringBoot 项目,添加 RabbitMQ 依赖。

这里将生产者和消费者放在一个项目。

application.properties中添加连接 RabbitMQ 服务的配置,以及开启消息确认机制需要的配置:

server.port=8080
# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 开启消息是否已经发送到交换机的确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息未成功投递到目标队列时将消息返回
spring.rabbitmq.publisher-returns=true
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

创建交换机、队列,并完成绑定:

@Configuration
public class AckRabbitMQConfig {
    // Fanout交换机
    @Bean
    FanoutExchange ackExchange() {
        return new FanoutExchange("ack.exchange", true, false);
    }

    // 消息队列
    @Bean
    Queue ackQueue() {
        return new Queue("ack.queue", true);
    }

    // 绑定队列和交换机
    @Bean
    Binding ackBinding() {
        return BindingBuilder.bind(ackQueue()).to(ackExchange());
    }
}

二、消息发送确认

消息发送确认的第一部分,是确认消息是否已经成功发送到交换机,我们需要实现RabbitTemplate.ConfirmCallback接口:

@Service
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    /**
     * @param correlationData
     * @param ack true 表示消息成功发送到交换机,false 则发送失败
     * @param cause 消息发送失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息已经发送到交换机!");
        } else {
            System.out.println("消息发送到交换机失败:" + cause);
        }
    }
}

消息无论是否成功到达交换机都会调用confirm方法。

消息发送确认的第二部分,就是消息是否成功的从交换机投放到目标队列,需要实现RabbitTemplate.ReturnsCallback接口:

@Service
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("未成功投递到队列的消息:"+ returned.toString());
    }
}

returnedMessage方法只会在消息未成功投递到目标队列时被调用ReturnedMessage就是投递失败的消息基本信息。

定义好了两种消息发送确认服务,接下来就是配置消息发送确认服务,可以放在 RabbitMQ 配置类里进行全局配置:

@Configuration
public class AckRabbitMQConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    ConfirmCallbackService confirmCallbackService;

    @Autowired
    ReturnCallbackService returnCallbackService;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        rabbitTemplate.setReturnsCallback(returnCallbackService);
    }
    ......
    ......
}

也可以在发送消息时单独配置:

@Service
public class SendMessageService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    ConfirmCallbackService confirmCallbackService;

    @Autowired
    ReturnCallbackService returnCallbackService;

    public void send(String message) {
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        rabbitTemplate.convertAndSend("ack.exchange", "", message);
        System.out.println("生产者发送的消息:" + message);
    }
}

三、消息接收确认

消息接收确认的实现就相对简单一些:

@Service
public class ReceiveMessageService {
    @RabbitListener(queues = "ack.queue")
    public void receive(String msg, Channel channel, Message message) {
        try {
            // int i = 1/0;
            // 确认收到消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("消费者确认收到消息:" + msg);
        } catch (Exception e) {
            try {
                // 拒绝消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("消费者拒绝消息:" + msg);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
}

使用消息接收的手动确认模式时,接收消息的方法需要额外添加ChannelMessage两个类型的参数。

Channel就是信道,在学习 Java client 操作 RabbitMQ 时,就是用它来发送接收消息的,不了解的可以复习一下。Message是 RabbitMQ 封装的消息类,里边包含了消息体、消息序号、以及交换机、队列等一些相关的信息。

这样我们就可以根据实际的业务需求,在适当的时机告诉 RabbitMQ 服务,消息已经成功消费,或者被拒绝消费。

这就涉及如下几个方法了:

这里有两个问题需要注意:

1、

如果拒绝消息时,设置requeuetrue,由于消息会重新进入队列头部,接下来又会被消费者处理,这样很可能陷入死循环,耗尽服务器资源,很危险的。所以在设置requeuetrue时,需要慎重考虑。

拒绝消息时一般都是由于发生异常、或者业务上的错误,导致消费流程不能正常进行下去,可以考虑将此时的消息发送到死信队列,后续再单独处理。具体怎么实现,后期会有专门的文章介绍,目前先了解即可。

2、

如果开启了消息接收的手动确认模式,但是消费消息时却没有做任何消息确认成功或拒绝的应答操作,则对应的消息会变成Unacked状态:

如果消费者客户端不重启,则Unacked状态的消息会一直堆积,不会被删除,也不会被重新消费。

如果消费者客户端重启,则消息会自动变为Ready状态,这样又会被重新消费一次。

三、效果测试

可以通过如下接口来发送消息:

@RestController
public class SendMessageController {
    @Autowired
    private SendMessageService sendMessageService;

    @GetMapping("/send/{msg}")
    public void send(@PathVariable("msg") String msg) {
        sendMessageService.send(msg);
    }
}

要测试消息不能成功发送到交换机的情况,只需要发送消息时指定一个不存在的交换机即可。

由于RabbitTemplate.ReturnsCallbackreturnedMessage方法只会在消息未成功投递到目标队列时被调用,所以要测试消息是否成功的从交换机投放到目标队列,可以注释掉AckRabbitMQConfig中交换机和队列绑定的代码,或者在后台进行交换机和队列的解绑:

这样消息自然不能成功的从交换机投放到队列。

至于消息接收确认,可以自行模拟不同的业务场景测试。

本文完!

上一篇下一篇

猜你喜欢

热点阅读