解决方案Amazing Arch分布式

分布式事务解决方案

2019-05-22  本文已影响174人  码道功臣

根据微服务架构的鼻祖 Martin Fowler 的忠告,微服务架构中应当尽量避免分布式事务。

分布式事务的讨论主要聚焦于强一致性和最终一致性的解决方案。

微服务的发展

微服务倡导将复杂的单体应用拆分为若干个功能简单、松耦合的服务,这样可以降低开发难度、增强扩展性、便于敏捷开发。当前被越来越多的开发者推崇,很多互联网行业巨头、开源社区等都开始了微服务的讨论和实践。

微服务落地存在的问题

虽然微服务现在如火如荼,但对其实践其实仍处于探索阶段。很多中小型互联网公司,鉴于经验、技术实力等问题,微服务落地比较困难。

如著名架构师Chris Richardson所言,目前存在的主要困难有如下几方面:

随着RPC框架的成熟,第一个问题已经逐渐得到解决。例如springcloud可以非常好的支持restful调用,dubbo可以支持多种通讯协议。

对于第三个问题,随着docker、devops技术的发展以及各公有云paas平台自动化运维工具的推出,微服务的测试、部署与运维会变得越来越容易。

而对于第二个问题,现在还没有通用方案很好的解决微服务产生的事务问题。分布式事务已经成为微服务落地最大的阻碍,也是最具挑战性的一个技术难题。

ACID

一致性理论

分布式事务的目的是保障分库数据一致性,而跨库事务会遇到各种不可控制的问题,如个别节点永久性宕机,像单机事务一样的 ACID 是无法奢望的。

另外,业界著名的 CAP 理论也告诉我们,对分布式系统,需要将数据一致性和系统可用性、分区容忍性放在天平上一起考虑。

两阶段提交协议(简称2PC)是实现分布式事务较为经典的方案,但 2PC 的可扩展性很差,在分布式架构下应用代价较大,eBay 架构师 Dan Pritchett 提出了 BASE 理论,用于解决大规模分布式系统下的数据一致性问题。

BASE 理论告诉我们:可以通过放弃系统在每个时刻的强一致性来换取系统的可扩展性。

CAP 理论

在分布式系统中,一致性(Consistency)、可用性(Availability)和分区容忍性(Partition Tolerance)3 个要素最多只能同时满足两个,不可兼得。其中,分区容忍性又是不可或缺的。

cap.png

举例:Cassandra、Dynamo 等,默认优先选择 AP,弱化 C;HBase、MongoDB 等,默认优先选择 CP,弱化 A。

BASE 理论

核心思想:

BASE 是对 CAP 中 AP 的一个扩展

一致性模型

数据的一致性模型可以分成以下三类:

分布式系统数据的强一致性、弱一致性和最终一致性可以通过 Quorum NRW 算法分析。

本地事务

tx.png

分布式事务典型方案

分类:

服务模式:

两阶段提交2PC(强一致性)

基于XA协议的两阶段提交:

2PC.png

缺点:

总的来说,XA 协议比较简单,成本较低,但是其单点问题,以及不能支持高并发(由于同步阻塞)依然是其最大的弱点。

本地消息表(最终一致性)

eBay 的架构师 Dan Pritchett,曾在一篇解释 BASE 原理的论文《Base:An Acid Alternative》中提到一个 eBay 分布式系统一致性问题的解决方案。

mq.png

它的核心思想是将需要分布式处理的任务通过消息或者日志的方式来异步执行,消息或日志可以存到本地文件、数据库或消息队列,再通过业务规则进行失败重试,它要求各服务的接口是幂等的。

本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。

优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。

缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

这个方案的核心在于第二阶段的重试和幂等执行。失败后重试,这是一种补偿机制,它是能保证系统最终一致的关键流程。

可靠消息的最终一致性代码示例

表结构

DROP TABLE IF EXISTS `rp_transaction_message`;

CREATE TABLE `rp_transaction_message` (
    `id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '主键ID',
    `version` INT (11) NOT NULL DEFAULT '0' COMMENT '版本号',
    `editor` VARCHAR (100) DEFAULT NULL COMMENT '修改者',
    `creater` VARCHAR (100) DEFAULT NULL COMMENT '创建者',
    `edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
    `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
    `message_id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '消息ID',
    `message_body` LONGTEXT NOT NULL COMMENT '消息内容',
    `message_data_type` VARCHAR (50) DEFAULT NULL COMMENT '消息数据类型',
    `consumer_queue` VARCHAR (100) NOT NULL DEFAULT '' COMMENT '消费队列',
    `message_send_times` SMALLINT (6) NOT NULL DEFAULT '0' COMMENT '消息重发次数',
    `areadly_dead` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '是否死亡',
    `status` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '状态',
    `remark` VARCHAR (200) DEFAULT NULL COMMENT '备注',
    `field1` VARCHAR (200) DEFAULT NULL COMMENT '扩展字段1',
    `field2` VARCHAR (200) DEFAULT NULL COMMENT '扩展字段2',
    `field3` VARCHAR (200) DEFAULT NULL COMMENT '扩展字段3',
    PRIMARY KEY (`id`),
    KEY `AK_Key_2` (`message_id`)
) ENGINE = INNODB DEFAULT CHARSET = utf8;

public interface RpTransactionMessageService {
    
    /**
     * 预存储消息.
     */
    public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 确认并发送消息.
     */
    public void confirmAndSendMessage(String messageId) throws MessageBizException;

    /**
     * 存储并发送消息.
     */
    public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 直接发送消息.
     */
    public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 重发消息.
     */
    public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 根据messageId重发某条消息.
     */
    public void reSendMessageByMessageId(String messageId) throws MessageBizException;

    /**
     * 将消息标记为死亡消息.
     */
    public void setMessageToAreadlyDead(String messageId) throws MessageBizException;

    /**
     * 根据消息ID获取消息
     */
    public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;

    /**
     * 根据消息ID删除消息
     */
    public void deleteMessageByMessageId(String messageId) throws MessageBizException;

    /**
     * 重发某个消息队列中的全部已死亡的消息.
     */
    public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;

    /**
     * 获取分页数据
     */
    PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;
    
}
@Service("rpTransactionMessageService")
public class RpTransactionMessageServiceImpl implements RpTransactionMessageService {
    
    private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class);
    
    @Autowired
    private RpTransactionMessageDao rpTransactionMessageDao;
    
    @Autowired
    private JmsTemplate notifyJmsTemplate;

    public int saveMessageWaitingConfirm(RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
        }
        message.setEditTime(new Date());
        message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
        message.setAreadlyDead(PublicEnum.NO.name());
        message.setMessageSendTimes(0);
        return rpTransactionMessageDao.insert(message);
    }

    public void confirmAndSendMessage(String messageId) {
        final RpTransactionMessage message = getMessageByMessageId(messageId);
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
        }
        message.setStatus(MessageStatusEnum.SENDING.name());
        message.setEditTime(new Date());
        rpTransactionMessageDao.update(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public int saveAndSendMessage(final RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
        }
        message.setStatus(MessageStatusEnum.SENDING.name());
        message.setAreadlyDead(PublicEnum.NO.name());
        message.setMessageSendTimes(0);
        message.setEditTime(new Date());
        int result = rpTransactionMessageDao.insert(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
        return result;
    }

    public void directSendMessage(final RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
        }
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public void reSendMessage(final RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
        }
        message.addSendTimes();
        message.setEditTime(new Date());
        rpTransactionMessageDao.update(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public void reSendMessageByMessageId(String messageId) {
        final RpTransactionMessage message = getMessageByMessageId(messageId);
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
        }
        int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
        if (message.getMessageSendTimes() >= maxTimes) {
            message.setAreadlyDead(PublicEnum.YES.name());
        }
        message.setEditTime(new Date());
        message.setMessageSendTimes(message.getMessageSendTimes() + 1);
        rpTransactionMessageDao.update(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public void setMessageToAreadlyDead(String messageId) {
        RpTransactionMessage message = getMessageByMessageId(messageId);
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
        }
        message.setAreadlyDead(PublicEnum.YES.name());
        message.setEditTime(new Date());
        rpTransactionMessageDao.update(message);
    }

    public RpTransactionMessage getMessageByMessageId(String messageId) {
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("messageId", messageId);
        return rpTransactionMessageDao.getBy(paramMap);
    }

    public void deleteMessageByMessageId(String messageId) {
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("messageId", messageId);
        rpTransactionMessageDao.delete(paramMap);
    }

    @SuppressWarnings("unchecked")
    public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) {
        log.info("==>reSendAllDeadMessageByQueueName");
        int numPerPage = 1000;
        if (batchSize > 0 && batchSize < 100) {
            numPerPage = 100;
        } else if (batchSize > 100 && batchSize < 5000) {
            numPerPage = batchSize;
        } else if (batchSize > 5000) {
            numPerPage = 5000;
        } else {
            numPerPage = 1000;
        }
        int pageNum = 1;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("consumerQueue", queueName);
        paramMap.put("areadlyDead", PublicEnum.YES.name());
        paramMap.put("listPageSortType", "ASC");
        Map<String, RpTransactionMessage> messageMap = new HashMap<String, RpTransactionMessage>();
        List<Object> recordList = new ArrayList<Object>();
        int pageCount = 1;
        PageBean pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);
        recordList = pageBean.getRecordList();
        if (recordList == null || recordList.isEmpty()) {
            log.info("==>recordList is empty");
            return;
        }
        pageCount = pageBean.getTotalPage();
        for (final Object obj : recordList) {
            final RpTransactionMessage message = (RpTransactionMessage) obj;
            messageMap.put(message.getMessageId(), message);
        }
        for (pageNum = 2; pageNum <= pageCount; pageNum++) {
            pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);
            recordList = pageBean.getRecordList();
            if (recordList == null || recordList.isEmpty()) {
                break;
            }
            for (final Object obj : recordList) {
                final RpTransactionMessage message = (RpTransactionMessage) obj;
                messageMap.put(message.getMessageId(), message);
            }
        }
        recordList = null;
        pageBean = null;
        for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
            final RpTransactionMessage message = entry.getValue();
            message.setEditTime(new Date());
            message.setMessageSendTimes(message.getMessageSendTimes() + 1);
            rpTransactionMessageDao.update(message);
            notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
            notifyJmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(message.getMessageBody());
                }
            });
        }
    }

    @SuppressWarnings("unchecked")
    public PageBean<RpTransactionMessage> listPage(PageParam pageParam, Map<String, Object> paramMap) {
        return rpTransactionMessageDao.listPage(pageParam, paramMap);
    }
    
}
@Component("messageBiz")
public class MessageBiz {

    private static final Log log = LogFactory.getLog(MessageBiz.class);

    @Autowired
    private RpTradePaymentQueryService rpTradePaymentQueryService;

    @Autowired
    private RpTransactionMessageService rpTransactionMessageService;

    /**
     * 处理[waiting_confirm]状态的消息
     * @param messages
     */
    public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) {
        log.debug("开始处理[waiting_confirm]状态的消息,总条数[" + messageMap.size() + "]");
        // 单条消息处理(目前该状态的消息,消费队列全部是accounting,如果后期有业务扩充,需做队列判断,做对应的业务处理。)
        for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
            RpTransactionMessage message = entry.getValue();
            try {
                log.debug("开始处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息");
                String bankOrderNo = message.getField1();
                RpTradePaymentRecord record = rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo);
                // 如果订单成功,把消息改为待处理,并发送消息
                if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) {
                    // 确认并发送消息
                    rpTransactionMessageService.confirmAndSendMessage(message.getMessageId());
                } else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) {
                    // 订单状态是等到支付,可以直接删除数据
                    log.debug("订单没有支付成功,删除[waiting_confirm]消息id[" + message.getMessageId() + "]的消息");
                    rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId());
                }
                log.debug("结束处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息");
            } catch (Exception e) {
                log.error("处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息异常:", e);
            }
        }
    }

    /**
     * 处理[SENDING]状态的消息
     * @param messages
     */
    public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("开始处理[SENDING]状态的消息,总条数[" + messageMap.size() + "]");
        // 根据配置获取通知间隔时间
        Map<Integer, Integer> notifyParam = getSendTime();
        // 单条消息处理
        for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
            RpTransactionMessage message = entry.getValue();
            try {
                log.debug("开始处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息");
                // 判断发送次数
                int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
                log.debug("[SENDING]消息ID为[" + message.getMessageId() + "]的消息,已经重新发送的次数[" 
                        + message.getMessageSendTimes() + "]");
                // 如果超过最大发送次数直接退出
                if (maxTimes < message.getMessageSendTimes()) {
                    // 标记为死亡
                    rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());
                    continue;
                }
                // 判断是否达到发送消息的时间间隔条件
                int reSendTimes = message.getMessageSendTimes();
                int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes);
                long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
                long needTime = currentTimeInMillis - times * 60 * 1000;
                long hasTime = message.getEditTime().getTime();
                // 判断是否达到了可以再次发送的时间条件
                if (hasTime > needTime) {
                    log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次发送时间[" 
                            + sdf.format(message.getEditTime()) + "],必须过了[" + times + "]分钟才可以再发送。");
                    continue;
                }
                // 重新发送消息
                rpTransactionMessageService.reSendMessage(message);
                log.debug("结束处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息");
            } catch (Exception e) {
                log.error("处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息异常:", e);
            }
        }
    }

    /**
     * 根据配置获取通知间隔时间
     * @return
     */
    private Map<Integer, Integer> getSendTime() {
        Map<Integer, Integer> notifyParam = new HashMap<Integer, Integer>();
        notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time")));
        notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time")));
        notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time")));
        notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time")));
        notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time")));
        return notifyParam;
    }

}
public class AccountingMessageListener implements SessionAwareMessageListener<Message> {

    private static final Log LOG = LogFactory.getLog(AccountingMessageListener.class);

    /**
     * 会计队列模板(由Spring创建并注入进来)
     */
    @Autowired
    private JmsTemplate notifyJmsTemplate;

    @Autowired
    private RpAccountingVoucherService rpAccountingVoucherService;

    @Autowired
    private RpTransactionMessageService rpTransactionMessageService;

    public synchronized void onMessage(Message message, Session session) {
        RpAccountingVoucher param = null;
        String strMessage = null;
        try {
            ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message;
            strMessage = objectMessage.getText();
            LOG.info("strMessage1 accounting:" + strMessage);
            param = JSONObject.parseObject(strMessage, RpAccountingVoucher.class);
            // 这里转换成相应的对象还有问题
            if (param == null) {
                LOG.info("param参数为空");
                return;
            }
            int entryType = param.getEntryType();
            double payerChangeAmount = param.getPayerChangeAmount();
            String voucherNo = param.getVoucherNo();
            String payerAccountNo = param.getPayerAccountNo();
            int fromSystem = param.getFromSystem();
            int payerAccountType = 0;
            if (param.getPayerAccountType() != null && !param.getPayerAccountType().equals("")) {
                payerAccountType = param.getPayerAccountType();
            }
            double payerFee = param.getPayerFee();
            String requestNo = param.getRequestNo();
            double bankChangeAmount = param.getBankChangeAmount();
            double receiverChangeAmount = param.getReceiverChangeAmount();
            String receiverAccountNo = param.getReceiverAccountNo();
            String bankAccount = param.getBankAccount();
            String bankChannelCode = param.getBankChannelCode();
            double profit = param.getProfit();
            double income = param.getIncome();
            double cost = param.getCost();
            String bankOrderNo = param.getBankOrderNo();
            int receiverAccountType = 0;
            double payAmount = param.getPayAmount();
            if (param.getReceiverAccountType() != null && !param.getReceiverAccountType().equals("")) {
                receiverAccountType = param.getReceiverAccountType();
            }
            double receiverFee = param.getReceiverFee();
            String remark = param.getRemark();
            rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, 
                    payerChangeAmount, receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, 
                    bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo, payerAccountType, payAmount, 
                    receiverAccountType, payerFee, receiverFee);
            //删除消息
            rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId());
        } catch (BizException e) {
            // 业务异常,不再写会队列
            LOG.error("==>BizException", e);
        } catch (Exception e) {
            // 不明异常不再写会队列
            LOG.error("==>Exception", e);
        }
    }

    public JmsTemplate getNotifyJmsTemplate() {
        return notifyJmsTemplate;
    }

    public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) {
        this.notifyJmsTemplate = notifyJmsTemplate;
    }

    public RpAccountingVoucherService getRpAccountingVoucherService() {
        return rpAccountingVoucherService;
    }

    public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) {
        this.rpAccountingVoucherService = rpAccountingVoucherService;
    }

}

与常规MQ的ACK机制对比

常规MQ确认机制:

常规MQ队列消息的处理流程无法实现消息发送一致性,因此直接使用现成的MQ中间件产品无法实现可靠消息最终一致性的分布式事务解决方案

消息发送一致性:是指产生消息的业务动作与消息发送的一致。也就是说,如果业务操作成功,那么由这个业务操作所产生的消息一定要成功投递出去(一般是发送到kafka、rocketmq、rabbitmq等消息中间件中),否则就丢消息。

下面用伪代码进行演示消息发送和投递的不可靠性:

先进行数据库操作,再发送消息:

public void test1(){
    //1 数据库操作
    //2 发送MQ消息
}

这种情况下无法保证数据库操作与发送消息的一致性,因为可能数据库操作成功,发送消息失败。

先发送消息,再操作数据库:

public void test1(){
    //1 发送MQ消息
    //2 数据库操作
}

这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。

在数据库事务中,先发送消息,后操作数据库:

@Transactional
public void test1(){
    //1 发送MQ消息
    //2 数据库操作
}

这里使用spring 的@Transactional注解,方法里面的操作都在一个事务中。同样无法保证一致性,因为发送消息成功了,数据库操作失败的情况下,数据库操作是回滚了,但是MQ消息没法进行回滚。

在数据库事务中,先操作数据库,后发送消息:

@Transactional
public void test1(){
    //1 数据库操作
    //2 发送MQ消息
}

这种情况下,貌似没有问题,如果发送MQ消息失败,抛出异常,事务一定会回滚(加上了@Transactional注解后,spring方法抛出异常后,会自动进行回滚)。

这只是一个假象,因为发送MQ消息可能事实上已经成功,如果是响应超时导致的异常。这个时候,数据库操作依然回滚,但是MQ消息实际上已经发送成功,导致不一致。

与消息发送一致性流程的对比:

TCC (Try-Confirm-Cancel)补偿模式(最终一致性)

TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。

它分为三个阶段:

tcc.jpeg

举例(Bob 要向 Smith 转账):

优点:
跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些

缺点:
缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

上一篇下一篇

猜你喜欢

热点阅读