RocketMQ源码解析(十三)-事务消息
什么是事务消息
首先我们用一个场景来讲一下事务消息解决的问题。分布式消息队列多用来解决多个微服务之间的调用解耦,不会因为单个服务的服务质量问题而影响其它业务。比如电商场景下,一笔订单支付成功后可能要通知多个系统,如erp系统准备发货、商品系统扣减库存等,这中间如果使用消息队列来解决的话会有2种情况:
1)先发送消息,后更新订单状态
2)先更新订单状态,后发送消息
在第一种情况下,如果订单更新时出现问题发生回滚,消息已经发送出去了,下游系统可能会出错。
如果采用第二种方案,如果订单更新后,发送消息前因为系统宕机导致消息没发出去,则下游系统就不知道订单的最新状态。
RocketMQ的事务消息就是为了解决上面的问题,它将消息和订单状态更新这2步操作放到一个事务中,要么都成功,要么都失败。下面看下它的实现原理
RocketMQ事务消息的实现原理
首先事务消息中用到的几个概念需要明确一下:
本地事务,用户实现的业务逻辑,比如上面例子中的更新订单状态的逻辑,本地事务执行返回的结果可能有3种状态,1)COMMIT,代表本地业务逻辑执行成功,这种情况下消息应该发出;2)ROLLBACK,本地业务逻辑执行失败,消息不应该发;3)UNKNOWN,未知状态,可能是事务正在执行中出异常等,这种情况下消息系统不知道该如何处理,当前的逻辑是会直接丢弃掉,等待后续检查逻辑来处理。
Prepared消息
RocketMQ在执行本地事务之前会先发一条Prepared消息到Broker,声明事务开始,但Prepared消息不会发给Consumer。
Commit/Rollback消息
在本地事务执行结束后,会根据本地事务的状态来决定发送Commit/Rollback消息,用于结束事务。Broker收到这条消息后会把之前的Prepared消息真正投递给Consumer。
下面看下事务消息交互流程,这里直接引用阿里云文档的图:
- 这里面的半消息(Half消息)即
Prepared
消息 - 第4步即发送给
Broker
的Commit/Rollback
消息 - 在最新的开源版本的
RocketMQ
中,第5步的动作并没有启用,所以当前的开源版本如果第4步的时候失败,则这个事务就永远处于Prepared
状态直到被删除。RocketMQ
还在对这个功能做优化,后续应该会上新的实现版本。
代码实现
现在我们看下事务消息的代码实现,首先按惯例引用官方文档的demo:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//1、设置处理回查请求的executor
producer.setExecutorService(executorService);
//2、设置本地事务Listener
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//3、发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
从上面的代码可以看出事务消息跟普通消息使用不同的Producer
第1步,设置的executor
用来处理Broker
的回查请求,因为这个功能现在已经去掉了,所以这个executor
其实是没用的。
第2步,设置的Listener
中,用户需要实现两个方法
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
第1个方法就是本地事务的实现,业务代码写在这个方法里面
第2个方法是Broker
回查消息状态的时候调用的方法,因为回查功能已经没有了,所以这个方法暂时也用不到
下面我们看下事务Producer
的代码实现
事务消息Producer
TransactionMQProducer
从DefaultMQProducer
继承,所以大体逻辑和普通的Producer
是一样的,除了start()
方法中加了针对事务消息的初始化逻辑:
@Override
public void start() throws MQClientException {
//如果Producer未设置Executor,则默认初始化一个
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
start
第一步就是检查下用户有没有设置executor
,如果没有则默认初始化,然后就调用DefaultMQProducerImpl
的start()
方法了,这里和普通消息没有什么区别。
消息发送
事务消息发送调用的DefaultMQProducerImpl.sendMessageInTransaction()
方法
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//1、检查TransactionListener是否存在
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//2、消息校验,校验topic和body长度
Validators.checkMessage(msg, this.defaultMQProducer);
//3、设置消息的事务属性,为PREPARED消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//4、发送消息,和发送普通消息调用同一个方法
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//5、当前Broker不会返回这个值
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//6、使用客户端生成的唯一id作为事务ID
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {//默认为空
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//7、消息发送成功,调用transactionListener执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//8、消息持久化失败,则事务回滚
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//9、发送结束事务消息(Commit/Rollback)
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
第3步,Prepared
消息会在消息的自定义属性中添加标识,包含消息类型和发送的ProducerGroup
第4步,提交消息到Broker
方法和普通消息调用的是同一个,实现中唯一针对事务消息的修改就是设置了消息的sysFlag
,在sendKernelImpl()
方法中:
//如果是Prepared消息,设置sysFlag
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
第7步,消息发送成功,则回调TransactionListener
的实现,执行本地事务,得到本地事务的执行状态。
第8步,如果第4步中prepared
消息虽然发送成功,但Broker
持久化消息失败,本地事务不会执行,直接回滚
第9步,根据本地事务的执行状态,发送Commit/Rollback
消息给Broker
,我们看下具体实现:
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
//1、获取接收prepared消息的Broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
//2、消息在commitLog的offset
requestHeader.setCommitLogOffset(id.getOffset());
//3、根据本地执行结果设置提交或回滚
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//4、设置消息在broker上的queueOffset
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//5、发送结束事务消息RequestCode.END_TRANSACTION,Oneway
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
第1步,获取接收prepared
消息的那个broker
地址,两个消息必须发到同一个broker
第2,4步,commit/rollback
消息需要携带原prepared
消息的commitLog offset
和queue offset
第5步,最后消息是用Oneway
的方式提交的,也就是Broker
处理无论成功还是失败,Producer
不会再做处理。这里之所以是这个逻辑,是因为RocketMQ
之前的版本是有回查逻辑的,当前最新版本把这个逻辑去掉后,确实大大影响了事务消息的可用性。
Broker处理Prepared消息
Broker
处理Prepared
消息是和普通消息用的同一个SendMessageProcessor
,所以我们之看下针对事务消息的特殊处理逻辑。
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
...
...
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
//如果是事务消息,判断broker是否支持事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//存储prepare消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//8、调用MessageStore接口存储消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
Processor
的代码中可以发现,针对Prepared
消息是用的TransactionalMessageService
来处理的,最终还是跟普通消息一样调用的MessageStore
的方法来存储消息到CommitLog
,但是在存储之前对消息数据做了转换 :
//TransactionalMessageServiceImpl.prepareMessage()
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
//TransactionalMessageBridge.java
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
//清除sysFlag中的事务消息状态位
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//事务prepare消息放入统一的topic,RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//queueId统一设置成0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
以上的代码可以看到,在将消息存到MessageStore
之前,会将原始的Topic
和queueId
放入自定义属性中,然后将sysFlag
设置成非事务消息,topic统一改成RMQ_SYS_TRANS_HALF_TOPIC
,queueId
设置为0。这样所有的Prepared
消息都会发到同一个topic
的同一个queue
下面。而且因为这个topic
是系统内置的,consumer
不会订阅这个topic的消息,所以Prepared
的消息是不会被Consumer
收到的。
Broker处理Commit/Rollback消息
Broker使用一个专门的EndTransactionProcessor
来处理Commit/Rollback
消息,逻辑如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
...
...
//判断是来源于Producer主动发的消息还是Broker主动检查返回的消息,这里只用来记录日志
if (requestHeader.getFromTransactionCheck()) {
//log
} else {
//log
}
OperationResult result = new OperationResult();
//1、如果收到的是提交事务消息
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//2、从commitLog中查出原始的prepared消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//3、检查获取到的消息是否和当前消息匹配(包括ProduceGroup、queueOffset、commitLogOffset)
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//4、使用原始的prepared消息属性,构建最终发给consumer的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
//5、调用MessageStore的消息存储接口提交消息,使用真正的topic和queueId
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//6、设置Prepared消息的标记位为delete
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//7、收到的回滚事务消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
- 当收到
Commit
消息时,Broker
会根据消息中携带的offset信息去CommitLog
中查出原来的Prepared
消息,这也就是为什么Producer
在发送最终的Commit
消息的时候一定要指定是同一个Broker
。消息查到后按照原来的topic和queueId,生成一条新的消息重新存到MessageStore
,这样这条消息就跟普通消息一样,被Consumer
收到了。
这里第6步需要注意下,消息Commit
后,理论上需要将原来的Prepared消息删除,这样Broker就能知道哪些消息一直没收到
Commit/Rollback,需要去
Producer回查状态。但是如果直接修改
CommitLog文件,这个代价是很大的,所以
RocketMQ是通过生成一个新的delete消息来标记的。这样,
Broker在检查的时候只需要看下
Prepared消息有没有对应的
delete`消息就可以了。具体代码如下:
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
return false;
}
}
public boolean putOpMessage(MessageExt messageExt, String opType) {
//选择和Prepared消息相同的queueId
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
//message的topic为RMQ_SYS_TRANS_OP_HALF_TOPIC
//消息的tags值是d,body中存储的是prepared消息的queueOffset
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
writeOp(message, messageQueue);
return true;
}
- 当收到
Rollback
事务消息,则不需要重新生成新消息发送,只需要将原来的消息标记位置成delete就可以了。
总结
事务消息通过2次消息确认和Producer
回调用户本地事务,来解决用户业务逻辑和消息发送的原子性问题。当前版本中事务消息因为性能问题取消了Broker
对长时间未delete的Prepared
消息的状态回查,导致事务消息的高可用有所降低。如果要使用事务消息需要等待后期版本更新,或者用户自己实现回查逻辑。