seata TCC模式

2019-10-29  本文已影响0人  紫色红色黑色

描述

TCC是相对于AT比较偏上层。是针对接口而实现的。要求全局事务发起者接口添加@GlobalTransactional,参与者接口prepare()添加@TwoPhaseBusinessAction,并制定commit方法和rollback方法。

适合在暴露RPC接口时使用,在OutService层方法中添加@GlobalTransactional开启全局事务。在InnerService层中方法中添加@TwoPhaseBusinessAction(name = "", commitMethod = "commit", rollbackMethod = "rollback")注解指定commit和rollback方法

spring中配置GlobalTransactionScanner初始化。该类实现AbstractAutoProxyCreatorInitializingBean。首先会执行wrapIfNecessary()

初始化

1.注册资源,封装TCCResource并缓存。生成ResourceId,将ResourceId注册到TC
2.设置@TwoPhaseBusinessAction的拦截器为TccActionInterceptor,设置@GlobalTransactional的拦截器为GlobalTransactionalInterceptor

一阶段

TM

TM处理流程和AT模式下一样,
1.开启全局事务,向TC注册全局事务并返回XID
2.如果业务执行成功,通知TC全局事务提交
3.如果业务执行失败,通知TC全局事务回滚
4.清除内存中XID

RM

注册分支事务,获取branchId。并将方法调用时的上下文发送给TC。执行业务逻辑

public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                    Callback<Object> targetCallback) throws Throwable {
    Map<String, Object> ret = new HashMap<String, Object>(16);

    //TCC name
    String actionName = businessAction.name();
    BusinessActionContext actionContext = new BusinessActionContext();
    actionContext.setXid(xid);
    //set action anme
    actionContext.setActionName(actionName);
    //TODO services

    //Creating Branch Record
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);

    //set the parameter whose type is BusinessActionContext
    Class<?>[] types = method.getParameterTypes();
    int argIndex = 0;
    for (Class<?> cls : types) {
        if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    //the final parameters of the try method
    ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
    //the final result
    ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
    return ret;
}

二阶段

TM通知TC全局提交/回滚。TC通知各分支事务。RM处理消息逻辑是RMHandlerTCC里。

RM

提交和回滚逻辑一样。
1.用TC发过来的ResourceId查到TCResource,找到commit/rollback方法并执行
2.通知TC执行结果

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                    String applicationData) throws TransactionException {
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    if (tccResource == null) {
        throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    if (targetTCCBean == null || commitMethod == null) {
        throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
    }
    try {
        boolean result = false;
        //BusinessActionContext
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        LOGGER.info(
            "TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:"
                + resourceId);
        if (ret != null) {
            if (ret instanceof TwoPhaseResult) {
                result = ((TwoPhaseResult)ret).isSuccess();
            } else {
                result = (boolean)ret;
            }
        }
        return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
    } catch (Throwable t) {
        String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
        LOGGER.error(msg, t);
        throw new FrameworkException(t, msg);
    }
}

引用

https://zhuanlan.zhihu.com/p/61981170
https://blog.csdn.net/hosaos/article/details/89847554

上一篇 下一篇

猜你喜欢

热点阅读