对事务消息的源码分析
我大概画了生产者和broker的交互
首先发送半消息,broker上接收消息,topic为RMQ_SYS_TRANS_HALF_TOPIC。
如果本地执行事务,提交消息 LocalTransactionState.COMMIT_MESSAGE。我们看看broker是如何处理的
然后在commitlog里查询到该消息
接着我们看看这个sendFinalMessage方法,就是提交真正的消息队列seq_topic1
下面我们看看deletePrepareMessage这个方法,要把半消息删除掉的逻辑。
下面我们看看RMQ_SYS_TRANS_OP_HALF_TOPIC这个topic,我们将RMQ_SYS_TRANS_HALF_TOPIC里的这个消息,存储到RMQ_SYS_TRANS_OP_HALF_TOPIC,其中body是该半消息在RMQ_SYS_TRANS_HALF_TOPIC的queueOffset。
如果客户端返回的是 LocalTransactionState.ROLLBACK_MESSAGE,则不再存储真正的seq_topic1,剩下的逻辑和commit大致相同。
下面是我们验证broker check消息,我们先返回unkown类型
当半消息发送成功后,回调事务监听器,存储本地事务执行结果,返回broker本地事务执行结果,我们看下由于执行本地事务的时候,返回的是unknown状态,所以肯定会check,所以此时seq_topic1并未生成,在check方法里 返回COMMIT_MESSAGE之后我们再看看。
下面我们来分析分析,定时check的过程
我们首先取我们从RMQ_SYS_TRANS_OP_HALF_TOPIC取消息,看下removeMap这个map key是RMQ_SYS_TRANS_HALF_TOPIC的queueOffset value是RMQ_SYS_TRANS_OP_HALF_TOPIC 的queueOffset。
如果removeMap包含i,则说明这个RMQ_SYS_TRANS_HALF_TOPIC消息已经被确认或者回滚 就不要check处理了。我们看下needDiscard如果check次数大于最大事务check次数,则put消息到TRANS_CHECK_MAX_TIME_TOPIC。
具体细节就不扣了,后面就是check消息 ,然后客户端确认本地事务执行结果,然后commit or rollback。 更新RMQ_SYS_TRANS_HALF_TOPIC和RMQ_SYS_TRANS_OP_HALF_TOPIC 的consumeoffset