MQ

RocketMQ事务消息

2020-01-14  本文已影响0人  丑人林宗己

抽空扒一下RocketMQ的事务消息,看看具体的实现,版本4.6.0

基本原理

RocketMQ的事务消息实现是基于二阶段协议实现的。即先发送半消息,成功之后执行本次事务,之后发送事务结束消息

这里其实之前一直存在一个误解。这个误解其实一直都在怀疑只是没时间确认,因为无法确认RocketMQ怎么可能得到事务对象

在执行完半消息发送之后,会调用TransactionListener写入数据库,写入数据库成功后等到事务正常提交之后再进行发送事务结束消息,即本地事务需要与调用TransactionListener写入数据库需要在一个事务内。

如下,期望是本地事务reduct执行与消息发送在一个事务之内,以此达到数据库事务的特性,当发送结束事务消息成功后,如果此时事务异常回滚了,消息与事务的一致性就被破坏了。

public class BusinessService {
    
    @Transactional
    public void reduct() {
        businessService.reduct();///
        rocketmqProducer.sendMessageInTransaction(
                message, args
        );
    }
}

相反过来,RocketMQ的事务消息的本质是保证消息一定会发送成功,与本地事务并非是一致性。或者可以先发送半消息,然后再执行事务,之后再根据事务的结果是否执行事务结束消息。

public class BusinessService {

    public BusinessService() {
        rocketmqProducer.setTransactionListener(new TransactionListenerImpl());
    }
    
    public static class TransactionListenerImpl implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            return businessService.reduct();///;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            return businessService.check();///;
        }
    }
    
    // 对外提供该方法
    public void reductAndSendMsg() {
        rocketmqProducer.sendMessageInTransaction(
                message, args
        );
    }

    @Transactional
    public void reduct() {
        // TODO
    }
}

RocketMQ的这种基于两阶段协议的事务消息,相比普通消息而言是有了事务成功的保证,但是使用时还是要慎重,因为它非常依赖消息队列,一旦消息队列服务波动涉及到的都将无法提供服务。

源码分析

client

客户端的源码基本上都在DefaultMQProducerImpl#sendMessageInTransaction()方法中,核心片段如下

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
    sendResult = this.send(msg); // 半消息,注意是同步的消息
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}
//....
switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            //...
           localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
           // .....
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

try {
    this.endTransaction(sendResult, localTransactionState, localException); // 异步消息
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

broker

服务端关注几个点

其次逻辑队列是RocketMQ存储中的一个非常核心的架构,也是需要重点关注

这里重点看看消息回查的逻辑即可,其他的按照类名称点进去翻一翻即可。

消息回查的基本逻辑,从RMQ_SYS_TRANS_HALF_TOPIC取出该主题下的一组逻辑队列,对队列进行循环,取出每个队列记录的上次消费的物理位置(此处的消费并非是消息被消费而是消息被处理过),同样,根据该逻辑队列从RMQ_SYS_TRANS_OP_HALF_TOPIC中取出相对应的逻辑队列,该逻辑队列存储的试衣镜处理过的消息(包括处理成功或者失败)。之后再根据 一些参数进行判断(具体请看代码),该丢弃的丢弃(丢弃进入CID_RMQ_SYS_TRANS队列),该回查的回查,最后将处理的队列进度更新到消费进度(consumer_table)中。

事务消息的整个逻辑大致分为三个点:

核心代码片段就不贴出来了,因为TransactionalMessageServiceImpl#check整个方法都是非常核心的,代码又长,有需要自行查阅。

image.png

总结

事实上个人并不是很赞同使用事务消息,至少看起来略显得鸡肋(目前的版本),在分布式事务场景下只能保证消息一定发送,而分布式事务是需要保证各个节点事务严格一致性,如果发送成功了但是没有被正确消费等等,该方法就不适合了。只能说有特定场景,但是还不具备普适性,当然技术方案自然不可能做到面面俱到,能解决部分场景已然很好。

其次服务过于依赖消息队列服务,如果消息队列出现三五分钟的动荡,上游的服务基本上是废弃的。而大多数场景下是上游的服务并非严格依赖消息队列,所以即便是消息队列动荡,上游理应可以继续提供服务,下游可能存在数据延迟更新等问题,需要介入修复。比如下单后的发货,存在三五分钟通知延迟是可以接受的,但是不能下单就不能接受。

也不能否则存在上下游严格依赖的场景。但是如果需要强一致性的解决方案,是不是应该考虑使用其他技术方案,比如Seata

上一篇 下一篇

猜你喜欢

热点阅读