(十五)事务的实现原理

2021-08-17  本文已影响0人  guessguess

在日常的代码中,一个操作中,往往包含着各种业务,比如一个A转账给B,A扣钱,B加钱,只有这俩个操作完全成功,这个操作才算是完成,任意一个操作失败都会导致整个操作失败。
对于消息队列的应用也会存在以上场景,有时我们发消息的同时,也需要进行其他操作。
如果操作失败的同时,消息发出去了怎么办,所以这里需要引入事务的支持。
rocketmq是在于4.3.0引入分布式事务。如果要使用使用的话,需要使用4.3.0及以后的版本。

由于整个过程其实还是比较复杂的,所以直接画了个图。方便理解。
rocketmq采取的是消息确认的机制。

如何使用

先来看看TransactionListener这个类

TransactionListener

public interface TransactionListener {
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

上面总共2个方法。
executeLocalTransaction执行本地事务
checkLocalTransaction检查本地事务
只要实现好这个接口就可以了。
下面来看看demo

    public static void testTransaction() throws MQClientException, UnsupportedEncodingException {
        设置生产者的组
        TransactionMQProducer tp = new TransactionMQProducer("group");
        设置注册中心的地址
        tp.setInstanceName("127.0.0.1:9876");
        实现事务监听器
        tp.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                return LocalTransactionState.UNKNOW;
            }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println(msg.getTransactionId());
                return LocalTransactionState.UNKNOW;
            }
        });
        线程池-用于处理事务状态回查(broker会定时回查)
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        tp.setExecutorService(executorService);
        tp.start();
        Message message = new Message("transaction", "user", new String("userId=1").getBytes("UTF-8"));
        tp.sendMessageInTransaction(message, 1);
    }

下面分别说说这俩个方法的作用

executeLocalTransaction

执行本地事务,返回事务状态

checkLocalTransaction

检查事务状态,在executeLocalTransaction执行完后,生产者会将事务状态进行上报,broker可以根据事务状态决定是否将消息转发(转发到真实队列,消费者从而进行消费)。
倘若在生产者进行事务状态上报的时候,生产者挂了,那么这个时候,Broker会选择同一个组的生产者进行状态回查。

所以如果要实现事务,这里的话,我们只需要去实现事务监听器里面的方法就可以了。至于逻辑的话,需要自己去实现。

实现原理

事务的实现原理

事务生产者是如何发送消息的?

先从发送方法定位。

public class DefaultMQProducerImpl implements MQProducerInner {
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        1.检查事务监听器,没有设置事务监听器会报错
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // ignore DelayTimeLevel parameter
        忽略延时级别
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }
        检查消息
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        2.设置事务属性,以及消息归属的生产者的组
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            3.进行消息的发送
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        默认事务状态为未知
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        根据消息发送的状态进行处理
        switch (sendResult.getSendStatus()) {
            消息发送成功
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    获取事务Id
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    事务执行器为空,所以不会执行此处代码
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        4.通过事务监听器执行本地事务,返回本地事务状态
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        若本地事务状态结果为空,默认使用未知状态
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            5.结束本地事务,对事务结果进行上报
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

}

发送消息的流程如下


事务消息生产者发送消息的流程如下

从上面看到,本地事务执行完后,会进行上报。
broker端会根据,上报的本地事务的状态,决定该消息是否可以被消费。

那么broker端是如何处理事务状态上报的

先来跟踪一下代码,查看上报事务状态的实现。

public class DefaultMQProducerImpl implements MQProducerInner {
    public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        根据事务状态,设置请求头
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        具体看看这个方法的实现
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

}

public class MQClientAPIImpl {
    public void endTransactionOneway(
        final String addr,
        final EndTransactionRequestHeader requestHeader,
        final String remark,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        这里就很明显了,RequestCode.END_TRANSACTION。
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

        request.setRemark(remark);
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }
}

通过生产者中进行事务上报的方法,可以确定请求对应的Code为RequestCode.END_TRANSACTION。
所以只需要通过brokerController找到对应的processor即可。
通过requestCode确定最后处理请求的处理器为EndTransactionProcessor。

EndTransactionProcessor是如何处理上报的事务的

public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        只有master可以去结束事务
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
        省略部分代码。。。日志打印相关
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            如果事务状态为提交,则将消息进行提交(转发到真实队列中),消费者可以进行消费
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    提交成功则结束事务,将prepare消息转为真实消息。
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    将真实消息进行保存。
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        保存成功,删除prepare消息
 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    回滚---直接删除prepare消息即可
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }
}

处理的逻辑比较简单。
1.如果是事务状态为提交
则将消息从prepare消息转为真实消息,进行保存。保存成功后,将prepare消息删除。
2.如果事务状态是回滚
将prepare消息删除即可。

其实一开始发送的消息都是prepare消息,只有当事务状态为commit的时候,才会将prepare消息转化为真实消息,存放到真正对应的主题对应的队列中。这个时候消费者才可以进行消费。
原理如下


broker对于事务上报的处理

最后看看消息还原的代码

public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        设置真实的topic
        msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        设置真实的队列Id
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
        msgInner.setWaitStoreMsgOK(false);
        msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        msgInner.setSysFlag(msgExt.getSysFlag());
        TopicFilterType topicFilterType =
            (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
                : TopicFilterType.SINGLE_TAG;
        long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
        msgInner.setTagsCode(tagsCodeValue);
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
        return msgInner;
    }
}

正常的流程下,其实生产者会对事务状态进行上报,broker根据事务状态决定是否要将Prepare消息进行删除或者转化。
那么倘若,在进行事务上报的时候,生产者挂了呢?
这个时候分布式的特性就体现了。
rocketmq是如何实现分布式的事务呢?答案在于生产者的group

如何实现分布式

其实这里并没有利用数据同步的方式。如果无法通过原先的生产者获取到事务状态,那么这个时候如何获取事务的状态呢?那就有必要对事务id以及对应的事务状态进行持久化。
然后生产者保持查询事务的方法实现逻辑一样,那这样子就可以通过任意一个生产者去查询出事务状态。

下面直接看看实现。
首先broker端中,有一个定时回查事务状态的一个定时器。
当controller初始化的时候,回查服务也会进行初始化。

回查任务的初始化
public class BrokerController {
    private void initialTransaction() {
        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
            log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
        }
        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        this.transactionalMessageCheckListener.setBrokerController(this);
        回查任务的初始化
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }
}
回查任务的启动

在Broker启动的时候,会顺便启动回查服务

public class BrokerController {
    public void start() throws Exception {
        省略部分代码,默认EnableDLegerCommitLog为false
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            this.registerBrokerAll(true, false, true);
        }

    private void startProcessorByHa(BrokerRole role) {
        if (BrokerRole.SLAVE != role) {
            if (this.transactionalMessageCheckService != null) {
                启动事务回查服务
                this.transactionalMessageCheckService.start();
            }
        }
    }

    }
}

从上面的代码看,回查服务也是随着broker启动的。但是需要EnableDLegerCommitLog关闭
回查服务的实现

public class TransactionalMessageCheckService extends ServiceThread {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);

    private BrokerController brokerController;

    public TransactionalMessageCheckService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    @Override
    public String getServiceName() {
        return TransactionalMessageCheckService.class.getSimpleName();
    }

    @Override
    public void run() {
        log.info("Start transaction check service thread!");
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            this.waitForRunning(checkInterval);
        }
        log.info("End transaction check service thread!");
    }

    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

}

从代码来看,回查服务的实现是基于单线程的循环,时间间隔为60秒。
核心执行的代码

public class TransactionalMessageCheckService extends ServiceThread {
    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        对于事务的回查核心在于check方法,对于事务状态的回查在于回查监听器中实现。
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    }
}
回查事务的核心实现

主要是在于AbstractTransactionalMessageCheckListener

public abstract class AbstractTransactionalMessageCheckListener {
    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        根据groupid,找到一个组中,任意一个可用的生产者实例所对应的channel
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            进行回查
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }
}
public class Broker2Client {
    public void checkProducerTransactionState(
        final String group,
        final Channel channel,
        final CheckTransactionStateRequestHeader requestHeader,
        final MessageExt messageExt) throws Exception {
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
        request.setBody(MessageDecoder.encode(messageExt, false));
        try {
            this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
        } catch (Exception e) {
            log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
                    group, messageExt.getMsgId(), e.toString());
        }
    }
}

从上面的代码来看,回查的核心实现,其实就是找到同一个组中的任意一个可用的生产者的channel,然后进行回查,获取事务状态。


回查任务流程大概如下

当然回查中,还有俩个重要的参数,就是事务的有效时间性,以及最大的回查次数,这些都是可以进行配置的,如果事务超时,或者是回查次数超过最大回查次数,那么该prepare消息都会被直接抛弃。

那么生产者接收到回查请求如何处理呢

得知,broker发送回查请求对应的请求码为RequestCode.CHECK_TRANSACTION_STATE。所以找到对应的处理器即可
看看ClientRemotingProcessor中处理请求的代码

public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                对于事务处理的请求
                return this.checkTransactionState(ctx, request);
        }
        return null;
    }

    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        if (messageExt != null) {
            if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
                messageExt.setTopic(NamespaceUtil
                    .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
            }
            通过消息获取事务id
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                设置事务Id
                messageExt.setTransactionId(transactionId);
            }
            获取生产者的组
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) {
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    检查事务的状态
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }
        return null;
    }
}

从上面代码看,最后也是通过Channel对应的ChannelHandler对读到的信息进行处理。
其中最核心的一个方法莫过于org.apache.rocketmq.client.impl.producer.MQProducerInner.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)
下面来看看生产者是如何处理回查请求的

public class DefaultMQProducerImpl implements MQProducerInner {
    @Override
    public void checkTransactionState(final String addr, final MessageExt msg,
        final CheckTransactionStateRequestHeader header) {
        Runnable request = new Runnable() {
            private final String brokerAddr = addr;
            private final MessageExt message = msg;
            private final CheckTransactionStateRequestHeader checkRequestHeader = header;
            private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

            @Override
            public void run() {
                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
                TransactionListener transactionListener = getCheckListener();
                if (transactionCheckListener != null || transactionListener != null) {
                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                    Throwable exception = null;
                    try {
                        if (transactionCheckListener != null) {
                            最后可以很清晰的看到,是通过检查事务状态的方法获取事务状态。
                            localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                        } else if (transactionListener != null) {
                            localTransactionState = transactionListener.checkLocalTransaction(message);
                        } else {
                        }
                    } catch (Throwable e) {
                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                        exception = e;
                    }
                    最后上报事务状态
                    this.processTransactionState(
                        localTransactionState,
                        group,
                        exception);
                } else {
                    log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
                }
            }

            private void processTransactionState(
                final LocalTransactionState localTransactionState,
                final String producerGroup,
                final Throwable exception) {
                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                thisHeader.setProducerGroup(producerGroup);
                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                thisHeader.setFromTransactionCheck(true);

                String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (uniqueKey == null) {
                    uniqueKey = message.getMsgId();
                }
                thisHeader.setMsgId(uniqueKey);
                thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
                switch (localTransactionState) {
                    case COMMIT_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                        break;
                    case ROLLBACK_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                        break;
                    case UNKNOW:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                        break;
                    default:
                        break;
                }

                String remark = null;
                if (exception != null) {
                    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
                }

                try {
                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                        3000);
                } catch (Exception e) {
                    log.error("endTransactionOneway exception", e);
                }
            }
        };

        this.checkExecutor.submit(request);
    }
}
生产者对于回查请求的处理

最后整合一下原理


事务的实现原理

最后总结一下:

1.如何保证事务?

只有事务提交状态为commit的时候,prepare消息最终才可以转化为真实消息,以供消费。

2.事务的状态有几种?

commit提交
rollback回滚
unkonw未知
事务为回滚状态,事务超时,或者超过最大回查次数的情况下,prepare消息最终会被抛弃。
在事务状态为未知的情况下,回查服务会一直尝试获取事务状态,若事务超时,或者超过最大回查次数的情况下,prepare消息最终会被抛弃。
只有commit状态下,事务才会被提交,消息最终才可以被消费。

3.回查服务的时间间隔,以及默认最大回查次数是可以设置的,以及事务的有效时间

默认事务有效时间为6000毫秒
默认最大回查次数为15次
回查服务的时间间隔为60秒

4.如何保证分布式

一个组内的所有生产者,执行本地事务,以及回查事务的方法应该保持一致。
同时生产者对于事务id以及状态应该做持久化。
这样子只要生产者组中,有任意一个可用的生产者,便可以进行状态回查,通过事务ID,将持久化的事务状态查出。

上一篇下一篇

猜你喜欢

热点阅读