Spring Cloud

Seata分布式事务之TM、RM、TC源码分析

2019-09-15  本文已影响0人  Blog

引言

本篇文章着重点在于调用流程分析,根据业务的发起到结束对Seata的TM、RM、TC模块进行源码调用过程分析,选用Seata版本为0.7.1版本,本篇文章分析均为Seata的AT事务,TM、RM模块分析的比较单一,只分析了逻辑调用,在分析TC模块时候才具体的结合TM、RM模块进行逻辑调用的全过程交互分析

时序图

笔者通过绘制时序图,我们可以清晰的知道在集成Seata、ShardingSphere、Dubbo之后,我们插入一条数据的整个内部调用链逻辑

TM模块分析

比较重要的类TransactionalTemplate、DefaultGlobalTransaction、DefaultTransactionManager

查看TransactionalTemplate源码可知

在全局事务拦截器GlobalTransactionalInterceptor中调用excute后,调用过程如上图所示,虽然注释信息已经解释的很详细了,首先获取全局事务GlobalTransaction,默认实现类为DefaultGlobalTransaction,然后开启全局事务beginTransaction

TransactionManager是一个自定义SPI接口

默认实现类为DefaultTransactionManager.继续跟踪begin过程

通过将消息发给TC,最终调用到AbstractRpcRemoting类下面的sendAsyncRequest消息发送接口,该方法中重要的地方,消息未发送就唤醒mergeLock同步的对象

然后就能找到AbstractRpcRemotingClient类下面的MergedSendRunnable消息发送线程,因为该线程是在初始化TMClient(TmRpcClient)和RMClient(RmRpcClient)的时候一并进行初始化

知道了消息发送,我们看看消息的返回结果是怎么得到的

消息的获取运用了Future模式,消息返回与赋值在哪?因为我们消息发送和接收肯定都是异步发送和接收,如果对Netty比较熟悉的同学可能会直接知道消息切入点,消息接收位于AbstractRpcRemotingClient类channelRead接口,消息的返回和赋值都是在这里进行,不熟悉Netty的同学也不必纠结,可以MessageFuture类查看哪里进行setResultMessage的

根据消息ID从futures获取MessageFuture并设置消息返回,因为是Future模式,所以就拿到了消息的返回结果

我们已开启全局事务为例分析了消息的收发,在TC/Seata-server模块中收发消息基本都一致,所以不在进一步分析TC模块中的消息收发。

我们着重分析TM模块中的以下部分

我们通过时序图可知,当我们执行业务时候,如果业务出现异常,那么该异常会被捕获,然后通知TC进行全局回滚操作,如果没有异常,那么就进行全局二阶段提交操作。但是可能会有同学比较好奇上图中的rs = business.execute();这个是个回调接口,回调实现类

这个简单理解就是Spring AOP,会继续调用增强链中的下一个(调用下一个拦截器),最后调用到我们的目标执行方法,如下图所示

比如我们使用了Dubbo,那么当我们执行orderService.insertOrder(orderEntity);在这个方法体中,如果有异常,那么completeTransactionAfterThrowing会执行,这里简单说明下,无论是当前模块的异常还是调用到下游出现异常,如使用Dubbo调用,我们都可以理解成这些都是属于同步调用,上游模块获取到的异常信息可能会是rpc异常,也可能是反射异常,也可能是下游主动抛出的异常,所以上游模块只能获取当前模块的完整异常信息,获取到下游的异常不是完全正确的。TM模块分析完毕

RM模块分析

比较重要的类DataSourceProxy、ConnectionProxy、PreparedStatementProxy、ExecuteTemplate,大致调用关系为DataSourceProxy获取连接ConnectionProxy,ConnectionProxy获取预编译PreparedStatementProxy,PreparedStatementProxy获取SQL执行器ExecuteTemplate。我们已Insert举例,查看ExecuteTemplate源码

通过解析SQL得出不同的执行器,这里我们会执行InsertExecutor,唯一可能需要注意的就是SelectForUpdateExecutor这个执行器,简单说下这个执行器的业务场景,因为RM是当前模块若没有异常就会提交一阶段数据入库,但是,往往我们当前模块可能会有某些业务接口,这些业务接口需要的是二阶段的最终数据,所以这里我们就可以使用Seata的@GlobalLock全局锁,这个会一直轮训直到获取到二阶段最终数据.有兴趣的同学可以仔细研究SelectForUpdateExecutor执行器.返回来,我们还是继续分析InsertExecutor,查看rs = executor.execute(args);源码,最终会调用到以下接口

判断是否全局事务,绑定XID,以及判断是否有全局锁注解,全局锁注解使用上文已介绍

判断是否自动提交事务,一般都是自动,若是自动提交事务会进行设置为false,会调用到以下代码

这个地方beforeImage主要是通过解析用户执行SQL,然后记录执行SQL前的快照,然后执行statementCallback.execute继而调用到以下方法执行SQL语句

afterImage同理类推,记录SQL执行后的快照,prepareUndoLog准备好undolog数据,也就是回滚表undo_log中即将写入的数据

connectionProxy.commit();调用到如下方法

首先判断是否是全局事务,若不是全局事务,也不是全局锁,那么直接提交数据,直接分析全局事务方法

首先注册当前分支事务,然后判断当前上下文是否有undolog数据,数据就是上文介绍的,在执行完SQL之后会执行prepareUndoLog,内部就会拼装undo_log回滚表数据

最后执行commit提交数据,然后上报分支事务状态,至此整个RM分支本地事务一阶段已完成,写到这里,可能还是会有同学有疑问,比如register注册分支事务是如何和TC模块交互的,TC模块的调用流程又是什么!这些我们在后文讲TC模块一起分析下流程

TC模块分析

比较重要的类AbstractTCInboundHandler、DefaultCoordinator、DefaultCore

我们看看Server类都做了什么事情.因为TC模块基于Netty框架,所以在研究TC模块的时候,若对Netty框架完全不了解的同学,可以先阅读下Netty框架相关文档,比如定义消息接收类,每个回调代表什么意思,发送消息给客户端如何进行。有了一定了解之后在阅读TC模块会更清晰。

言归正传,通过Server类,我们看到通过创建和启动RpcServer来进行和TM和RM交互,继续分析,设置RpcServer的handler-> DefaultCoordinator协调器.DefaultCoordinator继承AbstractTCInboundHandler。然后RpcServer继承AbstractRpcRemotingServer,然后设置Netty框架所需eventLoopGroupWorker、eventLoopGroupBoss,最后启动

所有消息入口为AbstractRpcRemoting类中channelRead方法,如果有同学说为什么是这个,Netty框架规则就是这样!

下面我们通过RM模块注册一阶段本地事务举例,详细分析整个RM和TC交互过程

通过查看源码最终调用到以下方法

这个resourceManagers在哪里赋值的?跟踪发现是静态初始化实例通过自定义SPI扩展接口通过BranchType进行赋值

我们上面获取的是AT类型ResourceManager,所以实现类为DataSourceManager,最终调用到AbstractResourceManager类下branchRegister方法

消息发送sendMsgWithResponse在TM模块已经介绍过,忘了的同学可以阅读上文中的分析。

我们着重讲解消息发送到TC之后,TC模块中的业务处理。

我们简要查看下查看下TC模块的UML类图

我们通过类图发现,我们比较关心的就RpcServer、AbstractRpcRemotingServer、AbstractRpcRemoting这3个类,那我们查看下我们分支事务注册是如何进行的

RM模块发送请求到TC最终执行到以下方法

然后TC模块接收消息入口为RpcServer类中以下方法

因为我们是注册RM分支事务事件,所以直接会执行super父类进行消息解析和分发,查看父类如何处理的。

因为我们的RM分支事务注册属于请求事件,所以最终会执行以下方法

messageExecutor为Server启动类中定义的线程池执行器

最终调用到RpcServer中dispatch方法进行消息的分发处理

查看RpcServer初始化接口可知

消息被派发到DefaultServerMessageListenerImpl类中进行处理,继续查看

我们的消息属于MergedWarpMessage类型,继续调用results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);这个transactionMessageHandler就是DefaultCoordinator(核心类之一,事务协调器)。继续分析

这个地方简单解释下,我们的请求BranchRegisterRequest继承AbstractTransactionRequestToTC,然后就会调用到以下方法体

这个handler就是DefaultCoordinator本身实例,因为DefaultCoordinator继承AbstractTCInboundHandler,所以会执行到AbstractTCInboundHandler类中的以下方法

继而调用到DefaultCoordinator类

最终调用到DefaultCore类方法

这个地方很关键,需要重点分析! 大致调用如下:

1:通过Xid获取全局事务GlobalSession,GlobalSession全局事务加锁调用逻辑

2:判断GlobalSession状态,判断状态是否可用或者有没有发起全局事务(Begin状态)

3:创建BranchSession分支事务,其中lockKeys简单理解就是操作的哪些行数据字段

4:BranchSession分支事务加锁,内部加锁大致为根据pk主键值+操作的行数据进行加锁(行锁),加锁逻辑比较复杂,可以在seata.io官网进行查看原理图,如果分支事务获取锁失败了,那么代表有其他分支事务正在处理业务逻辑,抛出锁冲突异常信息,该异常会被ConnectionProxy的以下代码捕获

然后抛出LockConflictException异常

最终AbstractDMLBaseExecutor.executeAutoCommitTrue捕获这个LockConflictException锁冲突异常.异常处理为:1->回滚数据。2->重试获取分支事务锁,默认重试30次,每次sleep间隔时间10ms,直到成功执行commit操作或者抛出锁超时,若抛出锁超时则会一直上抛SQLException到TM发起全局事务的模块,然后由TM模块发起全局回滚消息到TC,由TC下发分支事务回滚消息

5:保存全局事务GlobalSession对应的分支事务BranchSession数据到store,根据SPI接口判断采用file本地文件保存还是按照db数据库进行保存,以便于TC服务端自检操作

6:返回BranchSession分支事务id

重点:上文对分支事务BranchSession加锁了,还没有释放锁,下文将分析BranchSession分支事务锁的释放

我们在上文中第6步获取到分支事务id后,我们继续分析后续操作

1:注册分支事务register()方法,获取到分支事务id,设置当前ConnectionContext上下文的分支事务id

2:判断当前ConnectionContext上下文是否有保存的undo_log数据,若有数据则将数据写入undo_log表

封装BranchUndoLog分支事务数据

将数据插入undo_log表,状态为Normal状态,该状态有Normal、GlobalFinished 两种状态,后文分支事务回滚时候着重讲解这两种状态

3:提交当前分支事务(本地事务)

4:上报分支事务(本地事务)一阶段完成状态PhaseOne_Done

5:清空上下文数据

假设各个分支事务均未出现异常,各个分支事务均已完成一阶段并且上报PhaseOne_Done状态,那么发起全局事务TM模块将执行以下逻辑(TransactionalTemplate.commitTransaction)

然后调用到DefaultGlobalTransaction.commit,最终调用到DefaultTransactionManager.commit

和begin发起全局事务调用过程基本一致,不清楚流程的同学可以查看上文发起begin的调用过程。调用顺序:

1:DefaultTransactionManager.commit->发送commit消息

2:AbstractRpcRemoting.sendRequest->封装/发送请求数据

3:RpcServer.channelRead->TC/seata-server获取请求数据

4:AbstractRpcRemoting.channelRead->TC/seata-server获取请求数据

5:RpcServer.dispatch->消息分发处理

6:DefaultServerMessageListenerImpl.onTrxMessage->消息监听实现类,如:对RpcMessage消息相关处理

7:DefaultCoordinator.onRequest设置消息Handler处理类

8:AbstractTCInboundHandler.handle消息映射处理,根据消息请求类型进行映射处理,commit对应的请求类型为GlobalCommitRequest

9:DefaultCoordinator.doGlobalCommit

10:DefaultCore.commit分析下

10-1:根据xid获取全局事务,注意这里是重点,分支事务锁释放第一现场

此步骤会执行globalSession.setActive(false);设置当前全局事务为不可用状态,这样就不会有分支事务继续注册进globalSession。释放当前分支事务锁,还记得上文讲述过在分支事务提交commit时候需要先向TC注册当前分支事务吗?这个注册过程就涉及到分支事务锁,比如某个全局事务中的分支事务和另外一个全局事务中的分支事务,都在操作某一行数据,那么就要等到这个分支事务锁释放后,其他分支事务才能进行Commit操作,然后去获取分支事务锁.然后判断当前xid对应的全局事务是否是发起状态(begin)

10-2:判断当前全局事务是否可以异步执行,判断是否异步就是判断是否是TCC或者是AT事务类型,是AT事务类型则可以异步执行

将全局事务放入Map集合中

10-3:在启动TC/seata-server时初始化了以下方法

然后开启各种类型的定时任务

异步提交全局事务就将在如下方法中执行

10-4:遍历全局事务sessionMap集合,循坏调用DefaultCore.doGlobalCommit,该过程相对复杂,那就继续分析分析

10-5:eventBus主要数据监控统计作用

10-6:遍历分支事务,执行resourceManagerInbound.branchCommit

10-7:调用DefaultCoordinator.branchCommit

10-8:RpcServer.sendSyncRequest发送消息给RM分支事务模块

10-9:AbstractRpcRemotingClient.channelRead接收消息

10-10:AbstractRpcRemotingClient.dispatch消息分发处理

10-11:RmMessageListener.onMessage该消息监听,该监听器在RMClient中进行初始化设置

10-12:RmMessageListener.handleBranchCommit执行以下方法

10-13:这个handler.onRequest,handler在初始化RMClient中进行设置,所以该handler对象即DefaultRMHandler,随后调用过程为DefaultRMHandler父类AbstractRMHandler.onRequest->DefaultRMHandler.handle->RMHandlerAT父类AbstractRMHandler.handle->AbstractRMHandler.doBranchCommit最终调用如下方法

10-14:getResourceManager()为模版方法,实现类为RMHandlerAT类中,最终获取到的ResourceManager实现类为DataSourceManager

10-15:DataSourceManager.branchCommit将分支事务加入asyncWorker定时任务中

asyncWorker在SPI接口加载DataSourceManager时初始化

10-16:最终调用asyncWorker.branchCommit

Offer操作加入队列,然后直接返回分支状态,即PhaseTwo_Committed状态

简要分析下分支事务二阶段提交做了哪些事情吧,便于更好理解流程

10-16-1:判断队列是否有任务,若有任务,则封装mappedContexts集合数据

10-16-2:遍历mappedContexts集合,获取DataSourceProxy数据源代理对象,然后获取Connection数据库连接对象

10-16-3: 最终执行UndoLogManager.batchDeleteUndoLog删除undo_log表中xid和branch_id字段同时出现在xids集合和branchIds集合中的数据行,最终分支事务完成二阶段提交

10-17:在10-16步骤中,分支事务返回PhaseTwo_Committed状态后,在10-12步骤中

RmMessageListener.handleBranchCommit方法中通过sender.sendResponse(request, serverAddress, resultMessage)将handler.onRequest结果发给TC/seata-server

10-18:TC/seata-server接收数据流程简要概括,RpcServer.channelRead->消息赋值

10-19:最终我们10-6步骤就得到了BranchStatus分支事务状态,即PhaseTwo_Committed状态

11:TC/seata-server获取到PhaseTwo_Committed状态之后或者其他状态会按照不同策略进行处理,我们简要分析PhaseTwo_Committed状态

分析globalSession.removeBranch(branchSession)做了哪些事情

12:往sessionStore记录当前分支状态,如file本地文件或者db数据库,释放当前分支事务锁,最后分支事务集合中删除当前分支事务,其实这个地方,笔者也有一些疑虑,就是上文中commit第一现场时候已经释放了分支事务锁,为何这里还要进行释放?笔者根据Rollback回滚初步判断,可能是为了处理Rollback回滚,因为回滚过程第一现场只是将globalSession设置为不可用状态,所以需要在removeBranch中进行锁释放

至此,整个Commit过程分析完毕,涵盖分支事务一阶段,分支事务二阶段和TC服务端的一系列数据交互过程

上文分析了Commit过程,我们接着分析全局回滚Rollback过程,触发全局回滚Rollback大致分为两类:1:全局事务发起端内部异常被捕获。2:发起端调用下游业务端,下游业务端主动上抛各种异常信息被发起端捕获.

最终在发起端TransactionalTemplate.completeTransactionAfterThrowing进行异常捕获

最终发起全局回滚请求

和上文讲述的TransactionalTemplate.commitTransaction全局提交流程基本完全一致

大致调用过程为:

1:DefaultGlobalTransaction.rollback,最终调用到DefaultTransactionManager.rollback

2:AbstractRpcRemoting.sendRequest->封装/发送请求数据

3:RpcServer.channelRead->TC/seata-server获取请求数据

4:AbstractRpcRemoting.channelRead->TC/seata-server获取请求数据

5:RpcServer.dispatch->消息分发处理

6:DefaultServerMessageListenerImpl.onTrxMessage->消息监听实现类,如:对RpcMessage消息相关处理

7:DefaultCoordinator.onRequest设置消息Handler处理类

8:AbstractTCInboundHandler.handle消息映射处理,根据消息请求类型进行映射处理,rollback对应的请求类型为GlobalRollbackRequest

9:DefaultCoordinator.doGlobalRollback

10:DefaultCore.rollback分析下

11:根据xid获取全局事务,然后设置当前globalSession不可用,然后判断当前xid对应的全局事务是否是发起状态(begin)

12:DefaultCore.doGlobalRollback

13:遍历分支事务,执行resourceManagerInbound.branchRollback

14:调用DefaultCoordinator.branchRollback

15:RpcServer.sendSyncRequest发送消息给RM分支事务模块

16:AbstractRpcRemotingClient.channelRead接收消息

17:AbstractRpcRemotingClient.dispatch消息分发处理

18:RmMessageListener.onMessage该消息监听,该监听器在RMClient中进行初始化设置

19:RmMessageListener.handleBranchRollback执行以下方法

20:这个handler.onRequest,handler在初始化RMClient中进行设置,所以该handler对象即DefaultRMHandler,随后调用过程为DefaultRMHandler父类

AbstractRMHandler.onRequest->DefaultRMHandler.handle->RMHandlerAT父类AbstractRMHandler.handle->AbstractRMHandler.doBranchRollback最终调用如下方法

21:getResourceManager()为模版方法,实现类为RMHandlerAT类中,最终获取到的ResourceManager实现类为DataSourceManager

22:DataSourceManager.branchRollback

23:UndoLogManager.undo回滚步骤着重分析

24:查询undo_log表是否存在branchId、xid对应的数据

25:canUndo判断当前数据状态是否是Normal正常状态,如不是Normal状态则跳出while循坏,Normal状态是分支事务插入,是正常执行流程、GlobalFinished状态为异常状态,是全局事务发起的防御性插入,比如全局回滚时,分支事务还没执行,此时就需要插入防御性数据,用主键冲突来防止异常分支事务的插入,起了一个占位作用

26:解析undo_log数据行,获取TableMeta表数据,主要封装数据为表的所有列名字段信息allColumns,表的所有索引信息allIndexes

27:根据执行器进行执行,如:Insert操作会使用afterImage数据进行删除当前数据库中的数据进行数据回滚操作

28:如果undo_log表中存在branchId、xid对应的数据则删除分支事务undo_log表中branchId、xid对应的数据,如果undo_log中不存在branchId、xid对应的数据则防御性插入一条branchId、xid数据

注释信息已经解释的很清晰insertUndoLogWithGlobalFinished的作用,主要起防御性的作用,因为分支事务和全局事务都是异步RPC调用,所以为了防止一些异常情况进行的占位操作

至此对Seata AT模式整个TM,RM,TC调用流程分析完毕,若有不正确的地方欢迎指出!

上一篇下一篇

猜你喜欢

热点阅读