2-fescar(seata)源码分析-全局事务开始

2019-02-19  本文已影响0人  致虑

2-fescar源码分析-全局事务开始

一、官方介绍

1.TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
2.XID 在微服务调用链路的上下文中传播。

那这一篇主要分析fescar如何开启一个事务,TM 如何向 TC 申请开启一个全局事务,全局事务如何创建成功并生成一个全局唯一的 XID。并将XID在微服务中进行传递。

--

二、(原理)源码分析

紧接着上一篇的server启动分析,依然借助官网的example例图进行出发。

2.1 demo
项目中存在官方的example模块,里面就模拟了上图的相关流程:先回到本节主题:**全局事务的开端**
2.2.主服务入口

--

2.2 相关RPC服务初始化。
2.2 主服务执行事务方法体。
2.3.获取全局事务
2.4.TM 开始全局事务
2.5.TC 收到消息,开启全局事务
AbstractRpcRemoting读取到消息后,进行消息的分发,继续跟踪: 
![image.png](https://img.haomeiwen.com/i12071549/1205389e2183fb77.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


监听器已经捕获到事务开启的消息,进而处理器进一步进行处理:

```
#DefaultCoordinator 根据request类型分发处理器处理消息
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    if (!(request instanceof AbstractTransactionRequestToTC)) {
        throw new IllegalArgumentException();
    }
    AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC)request;
    transactionRequest.setTCInboundHandler(this);

    return transactionRequest.handle(context);
}

#GlobalBeginRequest
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
    return handler.handle(this, rpcContext);
}

#AbstractTCInboundHandler 异常处理模板执行后,回到DefaultCoordinator#doGlobalBegin处理逻辑
@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
    GlobalBeginResponse response = new GlobalBeginResponse();
    exceptionHandleTemplate(new Callback<GlobalBeginRequest, GlobalBeginResponse>() {
        @Override
        public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
            doGlobalBegin(request, response, rpcContext);
        }
    }, request, response);
    return response;
}

#AbstractExceptionHandler
public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request, AbstractTransactionResponse response) {
    try {
        callback.execute(request, response);
        response.setResultCode(ResultCode.Success);

    } catch (TransactionException tex) {
        response.setTransactionExceptionCode(tex.getCode());
        response.setResultCode(ResultCode.Failed);
        response.setMsg("TransactionException[" + tex.getMessage() + "]");

    } catch (RuntimeException rex) {
        response.setResultCode(ResultCode.Failed);
        response.setMsg("RuntimeException[" + rex.getMessage() + "]");
    }
}

#DefaultCoordinator
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));
}

#DefaultCore 开始逻辑
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
    GlobalSession session = GlobalSession.createGlobalSession(
            applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    session.begin();

    return XID.generateXID(session.getTransactionId());
}

#GlobalSession 获取GlobalSession
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
    this.transactionId = UUIDGenerator.generateUUID();
    this.status = GlobalStatus.Begin;

    this.applicationId = applicationId;
    this.transactionServiceGroup = transactionServiceGroup;
    this.transactionName = transactionName;
    this.timeout = timeout;
}

#GlobalSession GlobalSession开始逻辑
@Override
public void begin() throws TransactionException {
    this.status = GlobalStatus.Begin;
    this.beginTime = System.currentTimeMillis();
    this.active = true;
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onBegin(this);
    }
}

#AbstractSessionManager 将GlobalSession进行缓存
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
    }
    transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
    sessionMap.put(session.getTransactionId(), session);

}

#XID 生成全局的XID并且返回
public static String generateXID(long tranId) {
    return ipAddress + ":" + port + ":" + tranId;
}
```

那么对于上面的执行流程分析一下:

- 1.SocketChannel read到消息,并且进行消息的分发
- 2.根据消息类型寻找对应的消息处理器,此处是GlobalBeginRequest请求,那么自然寻找到GlobalBeginResponse handle逻辑
- 3.统一的异常模板处理后,进入核心处理逻辑DefaultCoordinator#doGlobalBegin,开始进行处理
- 4.根据applicationId, transactionServiceGroup创造全局事务session:GlobalSession,同时设置session中全局事务状态为GlobalStatus.Begin,生成uuid的transactionId。
- 5.将对应的sessionManager加入lifecycleListeners集合,以管控整个session生命周期
- 6.开始session的生命周期,且设置相关开始时间,并将session以<session.getTransactionId(), session>map进行缓存进SessionManager
- 7.根据TransactionId生成XID,以Response形式返回给TM。
- 8.完成事务的开启逻辑。

--

2.6.TM 收到处理结果(XID),继续回到TM跟踪
三.未完待续。。。

后续分析主要还是根据example官方实例分为:分支事务注册、事务回滚、事务提交进行。
同时后续每一流程都紧密关联Server,因此还会频繁回到上叙server启动后,收到消息被触发的后续逻辑。

上一篇 下一篇

猜你喜欢

热点阅读