项目实战- rabbitmq 可靠性投递springcloud

rabbitmq 可靠性投递(四)之实现可靠性投递

2019-01-18  本文已影响144人  HmilyMing
前言

在之前介绍了可靠性投递方案和项目搭建::https://juejin.im/post/5c3f43dae51d45731470a18c

唯一ID:https://juejin.im/post/5c3eb6e7518825253b5ea545

前期的一些准备:https://juejin.im/post/5c3f4bafe51d4551ec60b521

先给出完整代码:
https://github.com/hmilyos/common.git 
https://github.com/hmilyos/snowFlakeDemo.git
https://github.com/hmilyos/rabbitmq-common.git       available 分支

先来回顾一下我们可靠性投递的流程


image

流程的示意图如上所示,比如我下单成功了,这时进行 step1,对我的业务数据进行入库,业务数据入库完毕(这里要特别注意一定要保证业务数据入库)再对要发送的消息进行入库,图中采用了两个数据库,可以根据实际业务场景来确定是否采用两个数据库,如果采用了两个数据库,有人可能就像到了采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。
对业务数据和消息入库完毕就进入 setp2,发送消息到 MQ 服务上,按照正常的流程就是消费者监听到该消息,就根据唯一 id 修改该消息的状态为已消费,并给一个确认应答 ack 到 Listener。如果出现意外情况,消费者未接收到或者 Listener 接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从 msg 数据库抓取那些超时了还未被消费的消息,重新发送一遍。重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。例如重试三次还是失败的,就把消息的 status 设置成 发送失败,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。

下面就让我们用代码来实现这个方案吧。

1. 简单的创建订单接口
@RestController
public class OrderController {

    @Autowired
    private IMessageService messageService;

    @GetMapping("/createOrder")
    public ServerResponse createOrder(long userId){
        return messageService.createOrder(userId);
    }
}
2. IMessageService 接口以及实现类
public interface IMessageService {

    @Transactional
    ServerResponse createOrder(long userId);
    
}
@Service
public class MessageServiceImpl implements IMessageService {

    private final  static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);

    @Autowired
    private ISnowFlakeService snowFlakeService;

    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Override
    public ServerResponse createOrder(long userId) {
//        首先是针对业务逻辑,进行下单的业务,保存到数据库后
//        业务落库后,再对消息进行落库,
        long msgId = snowFlakeService.getSnowFlakeID();
        Message message = new Message(msgId, TypeEnum.CREATE_ORDER.getCode(), userId + "创建订单:" + msgId,
                0, MSGStatusEnum.SENDING.getCode(), DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
        int row = messageMapper.insertSelective(message);
        if (row == 0){
            throw new CustomException(500, "消息入库异常");
        }
//        消息落库后就可以发送消息了
        try {
            rabbitOrderSender.sendOrder(message);
        } catch (Exception e) {
//          因为业务已经落库了
//          所以 即使发送失败也不影响,因为可靠性投递,我回去再次尝试发送消息
            log.error("sendOrder mq msg error: ", e);
            messageMapper.updataNextRetryTimeForNow(message.getMessageId());
        }
        return ServerResponse.createBySuccess();
    }

}

注意了,我这里是直接拿消息的实体当做业务去落库了,实际上应该是先对订单实体落库,然后再对消息实体落库,最后发送消息!

3. 发送消息的具体实现 RabbitOrderSender

具体代码如下,注释也写得很详细了:


@Component
public class RabbitOrderSender {

    private static final Logger log = LoggerFactory.getLogger(RabbitOrderSender.class);

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageMapper messageMapper;

    @Value("${order.rabbitmq.listener.order.exchange.name}")
    private String exchangeName;

    @Value("${order.rabbitmq.send.create.key}")
    private String routingKey;

    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Message message) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData(message.getMessageId() + "");
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
//        throw new CustomException("--test--");
    }

    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("correlationData: {}", correlationData);
            String messageId = correlationData.getId();
            if(ack){
                //如果confirm返回成功 则进行更新
                messageMapper.changeMessageStatus(Long.parseLong(messageId), MSGStatusEnum.SEND_SUCCESS.getCode());
            } else {
                //失败则进行具体的后续操作:重试 或者补偿等手段
                log.error("消息发送失败,需要进行异常处理...");
                messageMapper.updataNextRetryTimeForNow(Long.parseLong(messageId));
            }
        }
    };

    //回调函数: return返回, 这里是预防消息不可达的情况,比如 MQ 里面没有对应的 exchange、queue 等情况,
//    如果消息真的不可达,那么就要根据你实际的业务去做对应处理,比如是直接落库,记录补偿,还是放到死信队列里面,之后再进行落库
//    这里脱开实际业务场景,不大好描述
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
                                    String replyText, String exchange, String routingKey) {
            log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
                    exchange, routingKey, replyCode, replyText);
        }
    };
}
4. 消费端代码 RabbitOrderReceiver
@Component
public class RabbitOrderReceiver {

    private static final Logger log = LoggerFactory.getLogger(RabbitOrderReceiver.class);

    @Autowired
    private MessageMapper messageMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${order.rabbitmq.listener.order.queue.name}",
                    durable="${order.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${order.rabbitmq.listener.order.exchange.name}",
                    durable="${order.rabbitmq.listener.order.exchange.durable}",
                    type= "${order.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${order.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload com.hmily.rabbitmq.rabbitmqcommon.entity.Message msg,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        log.info("-----------------RabbitOrderReceiver---------------------");
        log.info("消费端 order msg: {} ",  msg.toString());
        msg.setStatus(MSGStatusEnum.PROCESSING_IN.getCode());
        int row = messageMapper.updateByPrimaryKeySelective(msg);
        if (row != 0) {
            Thread.sleep(2000L);
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
//            接着去执行你对应的业务逻辑,
//            注意,这是可靠性投递,执行业务逻辑一定要做幂等性
        }

    }
}
注意:我这里并没有做幂等性去重,实际业务时必须做幂等性!!

6.我们的定时重试机制 SendMessageTask
@Component
public class SendMessageTask {
    private static final Logger log = LoggerFactory.getLogger(SendMessageTask.class);

    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private IMessageFailedService messageFailedService;

    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Scheduled(initialDelay = 3000, fixedDelay = 10000)
    public void reSend(){
        log.info("---------------定时任务开始---------------");
        List<Message> msgs = messageMapper.getNotProcessingInByType(TypeEnum.CREATE_ORDER.getCode(), null, 
                new int[]{MSGStatusEnum.SENDING.getCode()});
        msgs.forEach(msg -> {
            if (msg.getTryCount() >= Constants.MAX_TRY_COUNT) {
//              如果重试次数大于最大重试次数就不再重试,记录失败
                msg.setStatus(MSGStatusEnum.SEND_FAILURE.getCode());
                msg.setUpdateTime(new Date());
                messageMapper.updateByPrimaryKeySelective(msg);
                MessageFailed failed = new MessageFailed(msg.getMessageId(), "消息发送失败", "已达到最大重试次数,但是还是发送失败");
                messageFailedService.add(failed);
            } else {
//              未达到最大重试次数,可以进行重发消息
//              先改一下消息记录,保存好再发送消息
                msg.setNextRetry(DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
                int row = messageMapper.updateTryCount(msg);
                try {
                    rabbitOrderSender.sendOrder(msg);
                } catch (Exception e) {
                    log.error("sendOrder mq msg error: ", e);
                    messageMapper.updataNextRetryTimeForNow(msg.getMessageId());
                }
            }
        });
    }
}

至此,我们的可靠性投递就完成了。

上一篇 下一篇

猜你喜欢

热点阅读