(十五)事务的实现原理
在日常的代码中,一个操作中,往往包含着各种业务,比如一个A转账给B,A扣钱,B加钱,只有这俩个操作完全成功,这个操作才算是完成,任意一个操作失败都会导致整个操作失败。
对于消息队列的应用也会存在以上场景,有时我们发消息的同时,也需要进行其他操作。
如果操作失败的同时,消息发出去了怎么办,所以这里需要引入事务的支持。
rocketmq是在于4.3.0引入分布式事务。如果要使用使用的话,需要使用4.3.0及以后的版本。
由于整个过程其实还是比较复杂的,所以直接画了个图。方便理解。
rocketmq采取的是消息确认的机制。
如何使用
先来看看TransactionListener这个类
TransactionListener
public interface TransactionListener {
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
上面总共2个方法。
executeLocalTransaction执行本地事务
checkLocalTransaction检查本地事务
只要实现好这个接口就可以了。
下面来看看demo
public static void testTransaction() throws MQClientException, UnsupportedEncodingException {
设置生产者的组
TransactionMQProducer tp = new TransactionMQProducer("group");
设置注册中心的地址
tp.setInstanceName("127.0.0.1:9876");
实现事务监听器
tp.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println(msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}
});
线程池-用于处理事务状态回查(broker会定时回查)
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
tp.setExecutorService(executorService);
tp.start();
Message message = new Message("transaction", "user", new String("userId=1").getBytes("UTF-8"));
tp.sendMessageInTransaction(message, 1);
}
下面分别说说这俩个方法的作用
executeLocalTransaction
执行本地事务,返回事务状态
checkLocalTransaction
检查事务状态,在executeLocalTransaction执行完后,生产者会将事务状态进行上报,broker可以根据事务状态决定是否将消息转发(转发到真实队列,消费者从而进行消费)。
倘若在生产者进行事务状态上报的时候,生产者挂了,那么这个时候,Broker会选择同一个组的生产者进行状态回查。
所以如果要实现事务,这里的话,我们只需要去实现事务监听器里面的方法就可以了。至于逻辑的话,需要自己去实现。
实现原理
事务的实现原理
事务生产者是如何发送消息的?
先从发送方法定位。
public class DefaultMQProducerImpl implements MQProducerInner {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
1.检查事务监听器,没有设置事务监听器会报错
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
忽略延时级别
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
2.设置事务属性,以及消息归属的生产者的组
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
3.进行消息的发送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
默认事务状态为未知
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
根据消息发送的状态进行处理
switch (sendResult.getSendStatus()) {
消息发送成功
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
获取事务Id
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
事务执行器为空,所以不会执行此处代码
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
4.通过事务监听器执行本地事务,返回本地事务状态
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
若本地事务状态结果为空,默认使用未知状态
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
5.结束本地事务,对事务结果进行上报
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
}
发送消息的流程如下
事务消息生产者发送消息的流程如下
从上面看到,本地事务执行完后,会进行上报。
broker端会根据,上报的本地事务的状态,决定该消息是否可以被消费。
那么broker端是如何处理事务状态上报的
先来跟踪一下代码,查看上报事务状态的实现。
public class DefaultMQProducerImpl implements MQProducerInner {
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
根据事务状态,设置请求头
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
具体看看这个方法的实现
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
}
public class MQClientAPIImpl {
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
这里就很明显了,RequestCode.END_TRANSACTION。
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
}
通过生产者中进行事务上报的方法,可以确定请求对应的Code为RequestCode.END_TRANSACTION。
所以只需要通过brokerController找到对应的processor即可。
通过requestCode确定最后处理请求的处理器为EndTransactionProcessor。
EndTransactionProcessor是如何处理上报的事务的
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
只有master可以去结束事务
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
省略部分代码。。。日志打印相关
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
如果事务状态为提交,则将消息进行提交(转发到真实队列中),消费者可以进行消费
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
提交成功则结束事务,将prepare消息转为真实消息。
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
将真实消息进行保存。
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
保存成功,删除prepare消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
回滚---直接删除prepare消息即可
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
}
处理的逻辑比较简单。
1.如果是事务状态为提交
则将消息从prepare消息转为真实消息,进行保存。保存成功后,将prepare消息删除。
2.如果事务状态是回滚
将prepare消息删除即可。
其实一开始发送的消息都是prepare消息,只有当事务状态为commit的时候,才会将prepare消息转化为真实消息,存放到真正对应的主题对应的队列中。这个时候消费者才可以进行消费。
原理如下
broker对于事务上报的处理
最后看看消息还原的代码
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
设置真实的topic
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
设置真实的队列Id
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
msgInner.setSysFlag(msgExt.getSysFlag());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
}
}
正常的流程下,其实生产者会对事务状态进行上报,broker根据事务状态决定是否要将Prepare消息进行删除或者转化。
那么倘若,在进行事务上报的时候,生产者挂了呢?
这个时候分布式的特性就体现了。
rocketmq是如何实现分布式的事务呢?答案在于生产者的group
如何实现分布式
其实这里并没有利用数据同步的方式。如果无法通过原先的生产者获取到事务状态,那么这个时候如何获取事务的状态呢?那就有必要对事务id以及对应的事务状态进行持久化。
然后生产者保持查询事务的方法实现逻辑一样,那这样子就可以通过任意一个生产者去查询出事务状态。
下面直接看看实现。
首先broker端中,有一个定时回查事务状态的一个定时器。
当controller初始化的时候,回查服务也会进行初始化。
回查任务的初始化
public class BrokerController {
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
回查任务的初始化
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
}
回查任务的启动
在Broker启动的时候,会顺便启动回查服务
public class BrokerController {
public void start() throws Exception {
省略部分代码,默认EnableDLegerCommitLog为false
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
启动事务回查服务
this.transactionalMessageCheckService.start();
}
}
}
}
}
从上面的代码看,回查服务也是随着broker启动的。但是需要EnableDLegerCommitLog关闭
回查服务的实现
public class TransactionalMessageCheckService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
}
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}
从代码来看,回查服务的实现是基于单线程的循环,时间间隔为60秒。
核心执行的代码
public class TransactionalMessageCheckService extends ServiceThread {
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
对于事务的回查核心在于check方法,对于事务状态的回查在于回查监听器中实现。
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}
}
回查事务的核心实现
主要是在于AbstractTransactionalMessageCheckListener
public abstract class AbstractTransactionalMessageCheckListener {
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
根据groupid,找到一个组中,任意一个可用的生产者实例所对应的channel
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
进行回查
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
}
public class Broker2Client {
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final MessageExt messageExt) throws Exception {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}
}
}
从上面的代码来看,回查的核心实现,其实就是找到同一个组中的任意一个可用的生产者的channel,然后进行回查,获取事务状态。
回查任务流程大概如下
当然回查中,还有俩个重要的参数,就是事务的有效时间性,以及最大的回查次数,这些都是可以进行配置的,如果事务超时,或者是回查次数超过最大回查次数,那么该prepare消息都会被直接抛弃。
那么生产者接收到回查请求如何处理呢
得知,broker发送回查请求对应的请求码为RequestCode.CHECK_TRANSACTION_STATE。所以找到对应的处理器即可
看看ClientRemotingProcessor中处理请求的代码
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
对于事务处理的请求
return this.checkTransactionState(ctx, request);
}
return null;
}
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
}
通过消息获取事务id
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
设置事务Id
messageExt.setTransactionId(transactionId);
}
获取生产者的组
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
检查事务的状态
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
}
从上面代码看,最后也是通过Channel对应的ChannelHandler对读到的信息进行处理。
其中最核心的一个方法莫过于org.apache.rocketmq.client.impl.producer.MQProducerInner.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)
下面来看看生产者是如何处理回查请求的
public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
最后可以很清晰的看到,是通过检查事务状态的方法获取事务状态。
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
最后上报事务状态
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
this.checkExecutor.submit(request);
}
}
生产者对于回查请求的处理
最后整合一下原理
事务的实现原理
最后总结一下:
1.如何保证事务?
只有事务提交状态为commit的时候,prepare消息最终才可以转化为真实消息,以供消费。
2.事务的状态有几种?
commit提交
rollback回滚
unkonw未知
事务为回滚状态,事务超时,或者超过最大回查次数的情况下,prepare消息最终会被抛弃。
在事务状态为未知的情况下,回查服务会一直尝试获取事务状态,若事务超时,或者超过最大回查次数的情况下,prepare消息最终会被抛弃。
只有commit状态下,事务才会被提交,消息最终才可以被消费。
3.回查服务的时间间隔,以及默认最大回查次数是可以设置的,以及事务的有效时间
默认事务有效时间为6000毫秒
默认最大回查次数为15次
回查服务的时间间隔为60秒
4.如何保证分布式
一个组内的所有生产者,执行本地事务,以及回查事务的方法应该保持一致。
同时生产者对于事务id以及状态应该做持久化。
这样子只要生产者组中,有任意一个可用的生产者,便可以进行状态回查,通过事务ID,将持久化的事务状态查出。