RocketMQ-事务消息

2020-07-17  本文已影响0人  Travis_Wu

一、事务消息的引出

以购物场景为例,张三购买物品,账户扣款 100 元的同时,需要保证在下游的会员服务中给该账户增加 100 积分。而扣款的业务和增加积分的业务是在两个不同的应用,正常处理逻辑一般是先扣除100元,然后网络通知积分服务增加100积分。如下图:


事务消息1.png

以上过程会有三个问题:

  1. 账号服务在扣款的时候宕机了,这时候可能扣款成功,也可能扣款失败
  2. 由于网络稳定性无法保证,通知扣积分服务可能失败,但是扣款成功了
  3. 扣款成功,并且通知成功,但是增加积分的时候失败了
    实际上,rocketmq的事务消息解决的是问题1和问题2这种场景,也就是解决本地事务执行与消息发送的原子性问题。即解决Producer执行业务逻辑成功之后投递消息可能失败的场景。

而对于问题3这种场景,rocketmq提供了消费失败重试的机制。但是如果消费重试依然失败怎么办?rocketmq本身并没有提供解决这种问题的办法,例如如果加积分失败了,则需要回滚事务,实际上增加了业务复杂度,而官方给予的建议是:人工解决。RocketMQ目前暂时没有解决这个问题的原因是:在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题。

二、事务消息的实现思路和过程

image.png

三、事务消息的使用

关于rocketmq事务消息如何使用,最好的学习思路是从github上下载下源码,参考demo示例。这里也以官方的demo讲解如何使用(在demo基础上做了一点修改)


四、源码分析

public TransactionSendResult sendMessageInTransaction(final Message msg,final TransactionListener tranExecuter, final Object arg){
       //1.发送prepare消息
       SendResult sendResult = this.send(msg);
       
       LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
       Throwable localException = null;
       switch (sendResult.getSendStatus()) {
           case SEND_OK: {
               try {
                   //2.如果prepare消息发送成功,执行TransactionListener的executeLocalTransaction实现,也就是本地事务方法
                   localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
               } catch (Throwable e) {
                   localException = e;
               }
           }
           break;
           case FLUSH_DISK_TIMEOUT:
           case FLUSH_SLAVE_TIMEOUT:
           case SLAVE_NOT_AVAILABLE:
               localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
               break;
           default:
               break;
       }
       //3.结束事务,其实就是针对前面发送的prepare消息再发送一条确认消息(这条确认消息包含了本地事务执行的结果,这里可以猜测broker接收到该确认消息和之前的prepare消息必然有比较大的关联)
       this.endTransaction(sendResult, localTransactionState, localException);
   }
public class TransactionalMessageCheckService extends ServiceThread {
    @Override
    public void run() {
        //检查间隔,默认一分钟,可配置
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            try {
                //等待一分钟,以实现每一分钟回查需要的事务消息结果
                waitPoint.await(interval, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            } finally {
                //处理事务消息回查的核心逻辑方法
                brokerController.getTransactionalMessageService().check(timeout, checkMax,this.brokerController.getTransactionalMessageCheckListener());
            }
        }
    }
}

public class TransactionalMessageServiceImpl implements TransactionalMessageService {

    public void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {
            //获取到所有的RMQ_SYS_TRANS_HALF_TOPIC消息队列(prepare消息)
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues("RMQ_SYS_TRANS_HALF_TOPIC");
            for (MessageQueue messageQueue : msgQueues) {
                //从RMQ_SYS_TRANS_OP_HALF_TOPIC消息队列中获取到prepare消息对应的op消息(确认消息)
                MessageQueue opQueue = getOpQueue(messageQueue);
                //prepare消息的offset
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                //prepare消息
                MessageExt msgExt = getHalfMsg(messageQueue, i);
                //中间会有一堆的逻辑判断用于是否需要回查事务状态。
                //例如:是否超过了回查的次数(默认五次)、消息是否已经失效了、对应的op消息是否已经处理了等。
                if (isNeedCheck) {
                    //交给线程池异步处理回调查询事务的状态。
                    listener.resolveHalfMsg(msgExt);
                }
            }
    }
}
//接收broker的回调,回查本地事务情况,进行相应处理
@Override
public void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {
    //处理broker检查本地事务处理情况的回调任务
    Runnable request = new Runnable() {
        @Override
        public void run() {
                //执行TransactionListener实现的checkLocalTransaction方法,检查本地事务处理情况。
                LocalTransactionState localTransactionState = transactionCheckListener.checkLocalTransaction(message);
                //将检查本地事务处理情况再次发送给broker。
                this.processTransactionState(localTransactionState,group,exception);
        }

        //处理本地事务处理的结果反馈
        private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {
            final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
            ...
            根据检查到的本地事务执行的不同结果封装成不同的处理类型发送给broker
            switch (localTransactionState) {
                case COMMIT_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                case ROLLBACK_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    break;
                case UNKNOW:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    break;
                default:
                    break;
            }
            //结果反馈给broker
            DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr,thisHeader,remark,3000);
        }
    };
    //提交任务到线程池
    this.checkExecutor.submit(request);
}
上一篇下一篇

猜你喜欢

热点阅读