2-fescar(seata)源码分析-全局事务开始
2-fescar源码分析-全局事务开始
一、官方介绍
1.TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
2.XID 在微服务调用链路的上下文中传播。
那这一篇主要分析fescar如何开启一个事务,TM 如何向 TC 申请开启一个全局事务,全局事务如何创建成功并生成一个全局唯一的 XID。并将XID在微服务中进行传递。
--
二、(原理)源码分析
紧接着上一篇的server启动分析,依然借助官网的example例图进行出发。
2.1 demo
- 继续看下官网的结构图:
[图片上传失败...(image-1c1702-1550587371400)]
项目中存在官方的example模块,里面就模拟了上图的相关流程:先回到本节主题:**全局事务的开端**
2.2.主服务入口
-
1.启动主服务:BusinessServiceImpl
public class BusinessServiceImpl implements BusinessService { ... @Override @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") public void purchase(String userId, String commodityCode, int orderCount) { LOGGER.info("purchase begin ... xid: " + RootContext.getXID()); storageService.deduct(commodityCode, orderCount); orderService.create(userId, commodityCode, orderCount); throw new RuntimeException("xxx"); } ... public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"dubbo-business.xml"}); final BusinessService business = (BusinessService)context.getBean("business"); business.purchase("U100001", "C00321", 2); } }
public class StorageServiceImpl implements StorageService { private JdbcTemplate jdbcTemplate; @Override public void deduct(String commodityCode, int count) { LOGGER.info("Storage Service Begin ... xid: " + RootContext.getXID()); LOGGER.info("Deducting inventory SQL: update storage_tbl set count = count - {} where commodity_code = {}",count,commodityCode); jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode}); LOGGER.info("Storage Service End ... "); } public static void main(String[] args) throws Throwable { String applicationId = "dubbo-demo-storage-service"; String txServiceGroup = "my_test_tx_group"; RMClientAT.init(applicationId, txServiceGroup); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"dubbo-storage-service.xml"}); context.getBean("service"); JdbcTemplate jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate"); jdbcTemplate.update("delete from storage_tbl where commodity_code = 'C00321'"); jdbcTemplate.update("insert into storage_tbl(commodity_code, count) values ('C00321', 100)"); new ApplicationKeeper(context).keep(); } }
- 1.BusinessServiceImpl通过main启动,接着purchase方法调用了其他rpc的相关逻辑,此时是多个写服务,必然会涉及分布式事务。
- 2.StorageServiceImpl通过main方法启动服务,deduct是直接被BusinessServiceImpl调用的写服务方法。
- 3.其他OrderServiceImpl逻辑类似。
--
2.2 相关RPC服务初始化。
-
以StorageServiceImpl服务启动为例,首先看看dubbo-storage-service.xml文件配置
<bean id="storageDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy"> <constructor-arg ref="storageDataSource" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="storageDataSourceProxy" /> </bean> <dubbo:application name="dubbo-demo-storage-service" /> <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" /> <dubbo:protocol name="dubbo" port="20882" /> <dubbo:service interface="com.alibaba.fescar.tm.dubbo.StorageService" ref="service" timeout="10000"/> <bean id="service" class="com.alibaba.fescar.tm.dubbo.impl.StorageServiceImpl"> <property name="jdbcTemplate" ref="jdbcTemplate"/> </bean> <bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner"> <constructor-arg value="dubbo-demo-storage-service"/> <constructor-arg value="my_test_tx_group"/> </bean>
划重点:初始化服务时,内部会加载GlobalTransactionScanner类,那么这个类具体作用是什么呢?其实就是建立rpc服务下的socketChannel,将rpc与TC server建立连接,保持通信。
继续往下跟踪。 -
2.2.1 初始化服务内的SocketChannel
继续跟踪GlobalTransactionScanner逻辑:
@Override public void afterPropertiesSet() { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } initClient(); }
private void initClient() { ... TMClient.init(applicationId, txServiceGroup); ... }
根据配置中配置的applicationId, txServiceGroup调用RMClientAT去执行init操作:
public class RMClientAT { public static void init(String applicationId, String transactionServiceGroup) { RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); AsyncWorker asyncWorker = new AsyncWorker(); asyncWorker.init(); DataSourceManager.init(asyncWorker); rmRpcClient.setResourceManager(DataSourceManager.get()); rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT())); rmRpcClient.init(); } }
- 1.根据applicationId, transactionServiceGroup获取一个RmRpcClient实例
- 2.获取AsyncWorker一个异步工作执行器
- 3.异步工作执行器进行初始化
- 4.将AsyncWorker加入DataSourceManager
- 5.将DataSourceManager、消息监听器设置到RmRpcClient
- 6.初始化rmRpcClient
那么一步步分析下:
-
RmRpcClient:肯定是netty的一个客户端
看到其父类AbstractRpcRemotingClient,里面属性就是netty相关的工作线程组、启动组件等。
与server端相关的通信就靠这个RmRpcClient了 -
构造一个异步工作组:AsyncWorker,并进行初始化
public synchronized void init() { LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT); timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true)); timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { doBranchCommits(); } catch (Throwable e) { LOGGER.info("Failed at async committing ... " + e.getMessage()); } } }, 10, 1000 * 1, TimeUnit.MILLISECONDS); }
这里就是启动一个线程,轮训分支事务 提交任务,如果有则进行commit操作。其实分支事务的提交很简单,无非主要删除回滚的日志即可。先看下下面的逻辑:
private void doBranchCommits() { if (ASYNC_COMMIT_BUFFER.size() == 0) { return; } ... for (String resourceId : mappedContexts.keySet()) { Connection conn = null; try { ... for (Phase2Context commitContext : contextsGroupedByResourceId) { try { UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn); } catch (Exception ex) { LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex); } } } finally { ... } }
此处就是遍历ASYNC_COMMIT_BUFFER集合,删除回滚的sql,先猜想ASYNC_COMMIT_BUFFER数据的来源就是TM在执行事务业务逻辑execute时,备份而来的。后面继续分析。
-
进而将asyncWorker加入数据层管理器DataSourceManager,DataSourceManager就是具体执行回滚及提交等逻辑的
-
最后将DataSourceManager赋值给rmRpcClient,进而初始化rmRpcClient。
@Override public void init() { if (initialized.compareAndSet(false, true)) { super.init(); timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { reconnect(); } }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS); ... } } private void reconnect() { for (String serverAddress : serviceManager.lookup(transactionServiceGroup)) { if (serverAddress != null) { try { connect(serverAddress); } catch (Exception e) { ... } } } } @Override protected Channel connect(String serverAddress) { Channel channelToServer = channels.get(serverAddress); if (channelToServer != null) { channelToServer = getExistAliveChannel(channelToServer, serverAddress); if (null != channelToServer) { return channelToServer; } } ... channelLocks.putIfAbsent(serverAddress, new Object()); Object connectLock = channelLocks.get(serverAddress); synchronized (connectLock) { Channel channel = doConnect(serverAddress); return channel; } } private Channel doConnect(String serverAddress) { Channel channelToServer = channels.get(serverAddress); ... channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress)); } catch (Exception exx) { ... return channelFromPool; }
初始化rmRpcClient过程就是从nettyClientKeyPool获取一个与server建立连接的channel,返回即可。
至此,GlobalTransactionScanner逻辑基本完结。其核心功能就是给各个rpc服务内置一个与server建立连接的channel。
2.2 主服务执行事务方法体。
-
server、OrderService、AccountService、BusinessServiceImpl服务启动OK之后,继续回到下面执行方法的入口:
@Override @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") public void purchase(String userId, String commodityCode, int orderCount) { LOGGER.info("purchase begin ... xid: " + RootContext.getXID()); storageService.deduct(commodityCode, orderCount); orderService.create(userId, commodityCode, orderCount); throw new RuntimeException("xxx"); }
那么这里是如何被TM处理,进而提交至server,最终触发RM的回滚或者提交逻辑呢?
划重点@GlobalTransactional -
GlobalTransactional
spring提供的注解方式,降低对业务的侵入。那么直接找到拦截器解析类:GlobalTransactionalInterceptor的拦截逻辑: -
GlobalTransactionalInterceptor
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod()); if (anno != null) { try { /** * 通过覆盖TransactionalTemplate对象的execute()来对被注解的方法进行代理调 */ return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } @Override public int timeout() { return anno.timeoutMills(); } @Override public String name() { String name = anno.name(); if (!StringUtils.isEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } }); } catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getCause()); throw e.getCause(); default: throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code); } } } return methodInvocation.proceed(); }
以上逻辑对注解方法进行了拦截,通过TransactionalTemplate方法的执行execute后,最终return methodInvocation.proceed();返回业务执行结果。那么TransactionalTemplate的execute就是具体的事务切入点了,继续跟踪:
-
TransactionalTemplate
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException { // 1. get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 2. begin transaction try { tx.begin(business.timeout(), business.name()); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure); } Object rs = null; try { // Do Your Business rs = business.execute(); } catch (Throwable ex) { // 3. any business exception, rollback. try { tx.rollback(); // 3.1 Successfully rolled back throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); } catch (TransactionException txe) { // 3.2 Failed to rollback throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, ex); } } // 4. everything is fine, commit. try { tx.commit(); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } return rs; }
上面代码的逻辑按流程已经很清楚了
- 1.获取全局事务
- 2.执行业务逻辑
- 3.异常回滚
- 4.事务提交
本节分析的核心:获取一个全局事务,然后开始 已经出现:
// 1. get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); tx.begin(business.timeout(), business.name());
紧接着重点分析下这两点。
--
2.3.获取全局事务
-
GlobalTransactionContext:全局事务容器
/** * Get GlobalTransaction instance bind on current thread. * Create a new on if no existing there. * * @return new context if no existing there. */ public static GlobalTransaction getCurrentOrCreate() { GlobalTransaction tx = getCurrent(); if (tx == null) { return createNew(); } return tx; } public static GlobalTransaction getCurrent() { GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get(); if (tx != null) { return tx; } String xid = RootContext.getXID(); if (xid == null) { return null; } tx = new DefaultGlobalTransaction(xid); THREAD_TRANSACTION_CONTEXT.set(tx); return THREAD_TRANSACTION_CONTEXT.get(); } private static GlobalTransaction createNew() { GlobalTransaction tx = new DefaultGlobalTransaction(); THREAD_TRANSACTION_CONTEXT.set(tx); return THREAD_TRANSACTION_CONTEXT.get(); } DefaultGlobalTransaction() { this(null); } DefaultGlobalTransaction(String xid) { this.transactionManager = DefaultTransactionManager.get(); this.xid = xid; if (xid != null) { status = GlobalStatus.Begin; role = GlobalTransactionRole.Participant; } }
- 1.从当前线程获取全局事务
- 2.若没有就直接创建默认全局事务DefaultGlobalTransaction
- 3.将全局事务进行缓存,key是当前线程
- 4.因为此时还没有XID,因此这是全局事务的状态还是Unknow
--
2.4.TM 开始全局事务
-
tx.begin(business.timeout(), business.name());
继续跟踪:
#TransactionalTemplate @Override public void begin(int timeout, String name) throws TransactionException { if (xid == null && role == GlobalTransactionRole.Launcher) { xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); } else { if (xid == null) { throw new ShouldNeverHappenException(role + " is NOT in a global transaction context."); } LOGGER.info(role + " is already in global transaction " + xid); } }
这里角色是事务发起者Launcher,那么就继续begin
#TransactionalTemplate @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); return response.getXid(); } private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request); } catch (TimeoutException toe) { throw new TransactionException(TransactionExceptionCode.IO, toe); } } #TmRpcClient @Override public Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException { String svrAddr = XID.getServerAddress(RootContext.getXID()); String validAddress = svrAddr != null ? svrAddr : loadBalance(); Channel acquireChannel = connect(validAddress); Object result = super.sendAsyncRequestWithResponse(validAddress, acquireChannel, msg, timeout); if (result instanceof GlobalBeginResponse && ((GlobalBeginResponse)result).getResultCode() == ResultCode.Failed) { LOGGER.error("begin response error,release channel:" + acquireChannel); releaseChannel(acquireChannel, validAddress); } return result; } #AbstractRpcRemoting private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout) throws TimeoutException { ... ChannelFuture future; channelWriteableCheck(channel, msg); future = channel.writeAndFlush(rpcMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { messageFuture.setResultMessage(future.cause()); } destroyChannel(future.channel()); } } }); } }
分析下上叙开始事务的流程
- 1.TransactionalTemplate begin事务
- 2.DefaultTransactionManager构造GlobalBeginRequest参数并调用TmRpcClient发起事务开始消息
- 3.TmRpcClient获取已经建立连接的channel,将消息进行发送,以触发事务的开始。
2.5.TC 收到消息,开启全局事务
-
2.5.1.接收TM begin消息
image.png
前面server一节说过,消息的接收就在AbstractRpcRemoting channelRead方法,debug一下:
AbstractRpcRemoting读取到消息后,进行消息的分发,继续跟踪:
![image.png](https://img.haomeiwen.com/i12071549/1205389e2183fb77.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
监听器已经捕获到事务开启的消息,进而处理器进一步进行处理:
```
#DefaultCoordinator 根据request类型分发处理器处理消息
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC)request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
#GlobalBeginRequest
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
#AbstractTCInboundHandler 异常处理模板执行后,回到DefaultCoordinator#doGlobalBegin处理逻辑
@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new Callback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
doGlobalBegin(request, response, rpcContext);
}
}, request, response);
return response;
}
#AbstractExceptionHandler
public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request, AbstractTransactionResponse response) {
try {
callback.execute(request, response);
response.setResultCode(ResultCode.Success);
} catch (TransactionException tex) {
response.setTransactionExceptionCode(tex.getCode());
response.setResultCode(ResultCode.Failed);
response.setMsg("TransactionException[" + tex.getMessage() + "]");
} catch (RuntimeException rex) {
response.setResultCode(ResultCode.Failed);
response.setMsg("RuntimeException[" + rex.getMessage() + "]");
}
}
#DefaultCoordinator
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}
#DefaultCore 开始逻辑
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(
applicationId, transactionServiceGroup, name, timeout);
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
session.begin();
return XID.generateXID(session.getTransactionId());
}
#GlobalSession 获取GlobalSession
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
}
#GlobalSession GlobalSession开始逻辑
@Override
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
#AbstractSessionManager 将GlobalSession进行缓存
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
}
transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
sessionMap.put(session.getTransactionId(), session);
}
#XID 生成全局的XID并且返回
public static String generateXID(long tranId) {
return ipAddress + ":" + port + ":" + tranId;
}
```
那么对于上面的执行流程分析一下:
- 1.SocketChannel read到消息,并且进行消息的分发
- 2.根据消息类型寻找对应的消息处理器,此处是GlobalBeginRequest请求,那么自然寻找到GlobalBeginResponse handle逻辑
- 3.统一的异常模板处理后,进入核心处理逻辑DefaultCoordinator#doGlobalBegin,开始进行处理
- 4.根据applicationId, transactionServiceGroup创造全局事务session:GlobalSession,同时设置session中全局事务状态为GlobalStatus.Begin,生成uuid的transactionId。
- 5.将对应的sessionManager加入lifecycleListeners集合,以管控整个session生命周期
- 6.开始session的生命周期,且设置相关开始时间,并将session以<session.getTransactionId(), session>map进行缓存进SessionManager
- 7.根据TransactionId生成XID,以Response形式返回给TM。
- 8.完成事务的开启逻辑。
--
2.6.TM 收到处理结果(XID),继续回到TM跟踪
-
结果同步
#DefaultGlobalTransaction @Override public void begin(int timeout, String name) throws TransactionException { if (xid == null && role == GlobalTransactionRole.Launcher) { xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); } else { if (xid == null) { throw new ShouldNeverHappenException(role + " is NOT in a global transaction context."); } LOGGER.info(role + " is already in global transaction " + xid); } } #RootContext public static void bind(String xid) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind " + xid); } CONTEXT_HOLDER.put(KEY_XID, xid); }
- 1.获取到返回的XID后,将TM维护的全局事务状态设置为GlobalStatus.Begin。
- 2.将返回的XID保存进context容器(其实就是当前线程的threadLocal),以保证跟server端的session状态一致。
至此:整个事务开始流程分析完毕。最终状态就是:
- 1.server 中的GlobalSession保存了全局事务状态等相关的信息,包含XID
- 2.TM中的RootContext保存了全局事务状态等相关的信息,包含XID
--
三.未完待续。。。
后续分析主要还是根据example官方实例分为:分支事务注册、事务回滚、事务提交进行。
同时后续每一流程都紧密关联Server,因此还会频繁回到上叙server启动后,收到消息被触发的后续逻辑。