4-1 编码实现消息99%投递保障

2020-07-16  本文已影响0人  Finlay_Li

什么是消息的可靠性传递?

  1. 保障消息的成功发出。
  2. 保障MQ节点的成功接收。
  3. 发送端收到MQ节点(Broker)的应答。
  4. 完善的消息补偿机制。

生产端—可靠性投递解决方案

方案一:消息落库,対消息状态进行打标

image.png

方案二:消息的延迟投递,做二次确认,回调补偿

image.png

场景:下单 ---- 支付 ---- 活动奖励

  1. 完成订单的业务数据入库、这部分为了提高性能,也可以用MQ,结合其Confirm机制即可,成就是成失败就是失败
  2. 当用户支付完成,平台各种活动的业务,就要考虑保障消息100%的投递成功,因为支付都完成了,不可能把钱又退回给用户
  3. step1 支付成功后,生产端发送业务的消息到Broker
  4. step2 同时生产端发送一条相同业务的延迟消息(Second Send Delay Check)到Broker,需要设置延迟时间如1~2分钟
  5. step3 消费端对收到的业务消息进行逻辑处理
  6. step4 无论逻辑的处理是成功/失败,消费端处理完毕之后,再发送一条Confirm消息(不是ACK)到Broker
  7. step5 Callback service是一个单独的服务,它通过Broker去监听消费端发送的Confirm消息
    这时有两种可能:
    1. 收到消息,那么将消息持久化到DB当中(无论Confirm消息状态是成功的/失败的)
    2. 没收到消息,那么 Callback service 的 Listener Confirm 就不工作,即Msg DB 不会产生数据、如果Confirm消息是状态是成功的,那么Callback service就不能当失败来处理,因为未发出消息可能是网络、磁盘已满等情况导致的。因此Confirm消息状态是成功时,应记录到Redis中。
  8. step6 这时延迟消息已投递给Broker, Callback Service去监听延迟消息所对应的队列.收到之后去检查MSG DB中是否有这条消息
    1. 如果存在, 业务状态是处理成功的——>通过.
    2. 如果存在, 业务状态是处理失败的——>那么Callback Service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送。生产端收到信息后就会重新查询业务消息然后将消息发送出去,循环第一步。
    3. 如果不存在,从延迟消息中取出关键信息如唯一标识,去Redis查,看有没数据,如果有则是成功的——>通过。如果Redis查不到,则说明消费端没有处理成功,也没成功发送消息,那么Callback Service就需要主动发起RPC通信给上游服务
  9. step7 如果Callback service对 Msg DB更新失败的 或者 期间有什么非业务的异常,而是错误。则可发送通知给到管理员。手动处理该次业务并修复错误!

生产端&消费端

https://github.com/Liwh-yami/RabbitMQ/tree/master/src/main/java/com/finlay/scaffold/reliable

Callback 服务

package com.quanwugou.mall.mq;


import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quanwugou.mall.dao.model.BrokerMsgLog;
import com.quanwugou.mall.mq.model.ActivityResult;
import com.quanwugou.mall.service.BrokerMsgLogService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

@Component
public class ConfirmReceiver {
    private static final String CONFIRM_QUEUE_NAME = "pay.confirm.queue";

    @Autowired
    private BrokerMsgLogService brokerMsgLogService;

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    @RabbitHandler
    public void rec(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.err.println("Callback service  Listener Confirm -----------begin--------");
        /*step5 Callback service 消费 Confirm消息*/
        ObjectMapper mapper = new ObjectMapper();
        ActivityResult activityResult = mapper.readValue(msg, ActivityResult.class);
        String activityId = activityResult.getActivityId();
        BrokerMsgLog msgLog = brokerMsgLogService.getOne(Wrappers.<BrokerMsgLog>query().eq("activity_id", activityId));
        Integer resultStatus = activityResult.getStatus();
        if (msgLog == null) {
            //首次
            BrokerMsgLog brokerMsgLog = new BrokerMsgLog();
            Date date = new Date();
            brokerMsgLog.setMessage(msg)
                    .setStatus(resultStatus)
                    .setActivityId(activityId)
                    .setUpdateTime(date)
                    .setCreateTime(date);
            brokerMsgLogService.save(brokerMsgLog);//不关心这个存储结果、延迟消息消费时,会判断Redis
        } else {
            //更新日志:最大重试次数
            int i = msgLog.getTryCount();
            System.err.println("Confirm check 最大重试次数:" + i);
            msgLog.setTryCount(i + 1);
            msgLog.setUpdateTime(new Date());
            brokerMsgLogService.updateById(msgLog);
        }

        channel.basicAck(tag, false);
        System.err.println("Callback service  Listener Confirm -----------end--------");
    }
}
package com.quanwugou.mall.mq;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quanwugou.mall.dao.model.BrokerMsgLog;
import com.quanwugou.mall.mq.model.Activity;
import com.quanwugou.mall.mq.model.ActivityResult;
import com.quanwugou.mall.service.BrokerMsgLogService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author: Finlay
 * @description:
 * @date: 2020-07-17 5:46 下午
 */
@Component
public class DelayedReceiver {
    private static final String DELAYED_QUEUE_NAME = "pay.delayed.queue";

    @Autowired
    private BrokerMsgLogService brokerMsgLogService;
    @Autowired
    private RedisTemplate redisTemplate;

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    @RabbitHandler
    public void rec(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.err.println("接收到延迟消费时间:" + sf.format(new Date()));

        ObjectMapper mapper = new ObjectMapper();

        Activity activity = mapper.readValue(msg.getBody(), Activity.class);
        String activityId = activity.getId();
        BrokerMsgLog msgLog = brokerMsgLogService.getOne(Wrappers.<BrokerMsgLog>query().eq("activity_id", activityId));
        if (msgLog != null) {
            Integer status = msgLog.getStatus();
            if (status == 1) {
                channel.basicAck(tag, false);
            } else if (status == 0) {
                Integer tryCount = msgLog.getTryCount();
                if (tryCount >= 3) {
                    System.err.println("业务处理失败。。。。。。。请手动补偿奖励业务!");
                    channel.basicAck(tag, false);
                } else {
                    System.err.println("Delayed check 最大重试次数:" + tryCount);
                    //主动发起RPC通信给上游服务
                    System.err.println("主动发起RPC通信给上游服务---------Feign");
                    channel.basicAck(tag, false);
                }
            }
        } else {
            Object pop = redisTemplate.opsForSet().pop(activityId);
            if (pop == null) {
                //主动发起RPC通信给上游服务
                System.err.println("主动发起RPC通信给上游服务---------Feign");
                channel.basicAck(tag, false);
            } else {
                System.err.println("从redis中获取到业务成功通知---------更新日志---------业务结束!");
                //说明成功:ack权重更高,写前面
                channel.basicAck(tag, false);
                //日志处理
                ActivityResult result = new ActivityResult();
                result.setActivityId(activityId);
                result.setStatus(1);
                String resultStr = mapper.writeValueAsString(result);
                BrokerMsgLog brokerMsgLog = new BrokerMsgLog();
                Date date = new Date();
                brokerMsgLog.setMessage(resultStr)
                        .setStatus(1)
                        .setActivityId(activityId)
                        .setUpdateTime(date)
                        .setCreateTime(date);
                boolean save = brokerMsgLogService.save(brokerMsgLog);
                if (!save) {
                    System.err.println("业务处理成功,msgLog存储失败。。。。请手动补偿日志!");
                }
            }
        }
    }

}

测试结果

step6如果存在:失败


image.png
image.png

step6如果不存在:成功


image.png

存在错误或不足请不吝赐教~~~

上一篇 下一篇

猜你喜欢

热点阅读